Skip to content

Commit

Permalink
Use event handler to test port forwarding with skaffold dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Priya Wadhwa committed Aug 9, 2019
1 parent fcb3e8e commit ed8ff1f
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 118 deletions.
61 changes: 41 additions & 20 deletions integration/dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"time"

"github.com/GoogleContainerTools/skaffold/integration/skaffold"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/GoogleContainerTools/skaffold/proto"
"github.com/GoogleContainerTools/skaffold/testutil"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
)

Expand Down Expand Up @@ -270,43 +272,62 @@ func TestDevPortForwardGKELoadBalancer(t *testing.T) {
waitForPortForwardEvent(t, entries, "gke-loadbalancer", "service", "hello!!\n")
}

func waitForPortForwardEvent(t *testing.T, entries chan *proto.LogEntry, resourceName, resourceType, expected string) {
func getLocalPortFromPortForwardEvent(t *testing.T, entries chan *proto.LogEntry, resourceName, resourceType string) int {
timeout := time.After(1 * time.Minute)
var port int32
portForwardEvent:
for {
select {
case <-timeout:
t.Errorf("timed out waiting for port forwarding event")
break portForwardEvent
t.Fatalf("timed out waiting for port forwarding event")
case e := <-entries:
switch e.Event.GetEventType().(type) {
case *proto.Event_PortEvent:
if e.Event.GetPortEvent().ResourceName == resourceName &&
e.Event.GetPortEvent().ResourceType == resourceType {
port = e.Event.GetPortEvent().LocalPort
port := e.Event.GetPortEvent().LocalPort
t.Logf("Detected %s/%s is forwarded to port %d", resourceType, resourceName, port)
break portForwardEvent
return int(port)
}
default:
t.Logf("event received %v", e)
}
}
}
var body []byte
err := wait.PollImmediate(time.Millisecond*2000, 1*time.Minute, func() (bool, error) {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d", port))
if err != nil {
t.Logf("could not get %s/%s due to %s", resourceType, resourceName, err)
return false, nil
}
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
t.Logf("got %s from port %d but wanted %s", string(body), port, expected)
return string(body) == expected, err
})
}

func waitForPortForwardEvent(t *testing.T, entries chan *proto.LogEntry, resourceName, resourceType, expected string) {
port := getLocalPortFromPortForwardEvent(t, entries, resourceName, resourceType)
assertResponseFromPort(t, port, expected)
}

testutil.CheckErrorAndDeepEqual(t, false, err, string(body), expected)
// assertResponseFromPort waits for two minutes for the expected response at port.
func assertResponseFromPort(t *testing.T, port int, expected string) {
ctx, cancelTimeout := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancelTimeout()

for {
select {
case <-ctx.Done():
t.Fatalf("Timed out waiting for response from port %d", port)

default:
time.Sleep(1 * time.Second)
resp, err := http.Get(fmt.Sprintf("http://%s:%d", util.Loopback, port))
if err != nil {
logrus.Infof("error getting response from port %d: %v", port, err)
continue
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
logrus.Infof("error reading response: %v", err)
continue
}
if string(body) == expected {
return
}
logrus.Infof("didn't get expected response from port. got: %s, expected: %s", string(body), expected)
}
}
}

func replaceInFile(target, replacement, filepath string) ([]byte, os.FileMode, error) {
Expand Down
121 changes: 48 additions & 73 deletions integration/port_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,16 @@ import (
"context"
"fmt"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants"
"github.com/GoogleContainerTools/skaffold/proto"
"github.com/sirupsen/logrus"
"io/ioutil"
"net/http"
"os"
"testing"
"time"

"github.com/GoogleContainerTools/skaffold/integration/skaffold"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/config"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubectl"
kubectx "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes/context"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes/portforward"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/runcontext"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
)

func TestPortForward(t *testing.T) {
Expand Down Expand Up @@ -71,91 +66,71 @@ func TestPortForward(t *testing.T) {
// as expected. Then, the test force deletes a pod,
// and tests that the pod eventually comes up at the same port again.
func TestPortForwardDeletePod(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
if ShouldRunGCPOnlyTests() {
t.Skip("skipping test that is not gcp only")
}
// if testing.Short() {
// t.Skip("skipping integration test")
// }
// if !ShouldRunGCPOnlyTests() {
// t.Skip("skipping test that is not gcp only")
// }

ns, _, deleteNs := SetupNamespace(t)
defer deleteNs()

dir := "examples/microservices"
skaffold.Run().InDir(dir).InNs(ns.Name).RunOrFailOutput(t)
rpcAddr := randomPort()
env := []string{fmt.Sprintf("TEST_NS=%s", ns.Name)}
cmd := skaffold.Dev("--cache-artifacts=true", "--port-forward", "--rpc-port", rpcAddr, "-v=trace").InDir("examples/microservices").InNs(ns.Name).WithEnv(env)
stop := cmd.RunBackground(t)
defer stop()

cfg, err := kubectx.CurrentConfig()
if err != nil {
t.Fatal(err)
}

kubectlCLI := kubectl.NewFromRunContext(&runcontext.RunContext{
KubeContext: cfg.CurrentContext,
Opts: config.SkaffoldOptions{
Namespace: ns.Name,
},
})
client, shutdown := setupRPCClient(t, rpcAddr)
defer shutdown()

em := portforward.NewEntryManager(os.Stdout, kubectlCLI)
defer em.Stop()

pfe := portforward.NewPortForwardEntry(em, latest.PortForwardResource{
Type: "deployment",
Name: "leeroy-web",
Namespace: ns.Name,
Port: 8080,
})

cleanup := portforward.OverridePortForwardEvent()
defer cleanup()

logrus.SetLevel(logrus.TraceLevel)
// create a grpc connection. Increase number of reties for helm.
stream, err := readEventAPIStream(client, t, 20)
if stream == nil {
t.Fatalf("error retrieving event log: %v\n", err)
}

// Start port forwarding
portforward.ForwardPortForwardEntry(em, pfe)
// read entries from the log
entries := make(chan *proto.LogEntry)
go func() {
for {
entry, _ := stream.Recv()
if entry != nil {
entries <- entry
}
}
}()

waitForResponseFromPort(t, pfe.LocalPort(), constants.LeeroyAppResponse)
localPort := getLocalPortFromPortForwardEvent(t, entries, "leeroy-app", "service")
assertResponseFromPort(t, localPort, constants.LeeroyAppResponse)

// now, delete all pods in this namespace.
cmd := kubectlCLI.Command(context.Background(),
kubectlCLI := getKubectlCLI(t, ns.Name)

killPodsCmd := kubectlCLI.Command(context.Background(),
"delete",
"pods", "--all",
"-n", ns.Name,
)
if output, err := cmd.CombinedOutput(); err != nil {

if output, err := killPodsCmd.CombinedOutput(); err != nil {
t.Fatalf("error deleting all pods: %v \n %s", err, string(output))
}
// port forwarding should come up again on the same port
waitForResponseFromPort(t, pfe.LocalPort(), constants.LeeroyAppResponse)
assertResponseFromPort(t, localPort, constants.LeeroyAppResponse)
}

// waitForResponseFromPort waits for two minutes for the expected response at port.
func waitForResponseFromPort(t *testing.T, port int, expected string) {
ctx, cancelTimeout := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancelTimeout()

for {
select {
case <-ctx.Done():
t.Fatalf("Timed out waiting for response from port %d", port)

default:
time.Sleep(1 * time.Second)
resp, err := http.Get(fmt.Sprintf("http://%s:%d", util.Loopback, port))
if err != nil {
logrus.Infof("error getting response from port %d: %v", port, err)
continue
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
logrus.Infof("error reading response: %v", err)
continue
}
if string(body) == expected {
return
}
logrus.Infof("didn't get expected response from port. got: %s, expected: %s", string(body), expected)
}
func getKubectlCLI(t *testing.T, ns string) *kubectl.CLI {
cfg, err := kubectx.CurrentConfig()
if err != nil {
t.Fatal(err)
}

return kubectl.NewFromRunContext(&runcontext.RunContext{
KubeContext: cfg.CurrentContext,
Opts: config.SkaffoldOptions{
Namespace: ns,
},
})
}
1 change: 1 addition & 0 deletions pkg/skaffold/kubernetes/portforward/entry_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

var (
portForwardEvent = func(entry *portForwardEntry) {
fmt.Println("PORT FORWARDING!!!!!!!!!!!!!!!!!!!", entry.resource.Name, entry.resource.Type, entry.localPort)
// TODO priyawadhwa@, change event API to accept ports of type int
event.PortForwarded(
int32(entry.localPort),
Expand Down
4 changes: 0 additions & 4 deletions pkg/skaffold/kubernetes/portforward/port_forward_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ type portForwardEntry struct {
cancel context.CancelFunc
}

func (p *portForwardEntry) LocalPort() int {
return p.localPort
}

// key is an identifier for the lock on a port during the skaffold dev cycle.
// if automaticPodForwarding is set, we return a key that doesn't include podName, since we want the key
// to be the same whenever pods restart
Expand Down
24 changes: 3 additions & 21 deletions pkg/skaffold/kubernetes/portforward/port_forward_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,13 @@ import (
"time"
)

func NewPortForwardEntry(em EntryManager, resource latest.PortForwardResource) *portForwardEntry {
localPort := retrieveAvailablePort(9000, em.forwardedPorts)
return &portForwardEntry{
resource: resource,
localPort: localPort,
terminationLock: &sync.Mutex{},
}
}

func ForwardPortForwardEntry(em EntryManager, pfe *portForwardEntry) {
em.forwardPortForwardEntry(context.Background(), pfe)
}

func OverridePortForwardEvent() func() {
portForwardEventHandler := portForwardEvent
portForwardEvent = func(entry *portForwardEntry) {}
return func() { portForwardEvent = portForwardEventHandler }
}

// For WhiteBox testing only
// This is testing a port forward + stop + restart in a simulated dev cycle
func WhiteBoxPortForwardCycle(t *testing.T, kubectlCLI *kubectl.CLI, namespace string) {
em := NewEntryManager(os.Stdout, kubectlCLI)
cleanup := OverridePortForwardEvent()
defer cleanup()
portForwardEventHandler := portForwardEvent
defer func() { portForwardEvent = portForwardEventHandler }()
portForwardEvent = func(entry *portForwardEntry) {}
ctx := context.Background()
localPort := retrieveAvailablePort(9000, em.forwardedPorts)
pfe := &portForwardEntry{
Expand Down

0 comments on commit ed8ff1f

Please sign in to comment.