Skip to content

Commit

Permalink
Only listen to pods for the current RunID
Browse files Browse the repository at this point in the history
Fixes GoogleContainerTools#1753

Signed-off-by: David Gageot <david@gageot.net>
  • Loading branch information
dgageot committed May 4, 2020
1 parent 35b18f4 commit 57e7ecd
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 40 deletions.
27 changes: 18 additions & 9 deletions pkg/skaffold/kubernetes/debugging/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,35 @@ var (
)

type ContainerManager struct {
output io.Writer
cli *kubectl.CLI
podSelector kubernetes.PodSelector
namespaces []string
active map[string]string // set of containers that have been notified
aggregate chan watch.Event
output io.Writer
cli *kubectl.CLI
podSelector kubernetes.PodSelector
labelSelector string
namespaces []string
active map[string]string // set of containers that have been notified
aggregate chan watch.Event
}

func NewContainerManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes.PodSelector, namespaces []string) *ContainerManager {
func NewContainerManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes.PodSelector, labelSelector string, namespaces []string) *ContainerManager {
// Create the channel here as Stop() may be called before Start() when a build fails, thus
// avoiding the possibility of closing a nil channel. Channels are cheap.
return &ContainerManager{output: out, cli: cli, podSelector: podSelector, namespaces: namespaces, active: map[string]string{}, aggregate: make(chan watch.Event)}
return &ContainerManager{
output: out,
cli: cli,
podSelector: podSelector,
labelSelector: labelSelector,
namespaces: namespaces,
active: map[string]string{},
aggregate: make(chan watch.Event),
}
}

func (d *ContainerManager) Start(ctx context.Context) error {
if d == nil {
// debug mode probably not enabled
return nil
}
stopWatchers, err := aggregatePodWatcher(d.namespaces, d.aggregate)
stopWatchers, err := aggregatePodWatcher(d.labelSelector, d.namespaces, d.aggregate)
if err != nil {
stopWatchers()
return fmt.Errorf("initializing debugging container watcher: %w", err)
Expand Down
26 changes: 14 additions & 12 deletions pkg/skaffold/kubernetes/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ import (

// LogAggregator aggregates the logs for all the deployed pods.
type LogAggregator struct {
output io.Writer
kubectlcli *kubectl.CLI
podSelector PodSelector
namespaces []string
colorPicker ColorPicker
output io.Writer
kubectlcli *kubectl.CLI
podSelector PodSelector
labelSelector string
namespaces []string
colorPicker ColorPicker

muted int32
sinceTime time.Time
Expand All @@ -49,13 +50,14 @@ type LogAggregator struct {
}

// NewLogAggregator creates a new LogAggregator for a given output.
func NewLogAggregator(out io.Writer, cli *kubectl.CLI, baseImageNames []string, podSelector PodSelector, namespaces []string) *LogAggregator {
func NewLogAggregator(out io.Writer, cli *kubectl.CLI, baseImageNames []string, podSelector PodSelector, labelSelector string, namespaces []string) *LogAggregator {
return &LogAggregator{
output: out,
kubectlcli: cli,
podSelector: podSelector,
namespaces: namespaces,
colorPicker: NewColorPicker(baseImageNames),
output: out,
kubectlcli: cli,
podSelector: podSelector,
labelSelector: labelSelector,
namespaces: namespaces,
colorPicker: NewColorPicker(baseImageNames),
trackedContainers: trackedContainers{
ids: map[string]bool{},
},
Expand All @@ -73,7 +75,7 @@ func (a *LogAggregator) Start(ctx context.Context) error {
a.cancel = cancel

aggregate := make(chan watch.Event)
stopWatchers, err := AggregatePodWatcher(a.namespaces, aggregate)
stopWatchers, err := AggregatePodWatcher(a.labelSelector, a.namespaces, aggregate)
if err != nil {
stopWatchers()
return fmt.Errorf("initializing aggregate pod watcher: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/skaffold/kubernetes/portforward/forwarder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
)

// NewForwarderManager returns a new port manager which handles starting and stopping port forwarding
func NewForwarderManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes.PodSelector, namespaces []string, label string, opts config.PortForwardOptions, userDefined []*latest.PortForwardResource) *ForwarderManager {
func NewForwarderManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes.PodSelector, labelSelector string, namespaces []string, opts config.PortForwardOptions, userDefined []*latest.PortForwardResource) *ForwarderManager {
if !opts.Enabled {
return emptyForwarderManager
}
Expand All @@ -54,11 +54,11 @@ func NewForwarderManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes

ForwarderManager := &ForwarderManager{
output: out,
Forwarders: []Forwarder{NewResourceForwarder(em, namespaces, label, userDefined)},
Forwarders: []Forwarder{NewResourceForwarder(em, namespaces, labelSelector, userDefined)},
}

if opts.ForwardPods {
f := NewWatchingPodForwarder(em, podSelector, namespaces)
f := NewWatchingPodForwarder(em, podSelector, labelSelector, namespaces)
ForwarderManager.Forwarders = append(ForwarderManager.Forwarders, f)
}

Expand Down
16 changes: 9 additions & 7 deletions pkg/skaffold/kubernetes/portforward/pod_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,24 @@ var (
// container ports within those pods. It also tracks and manages the port-forward connections.
type WatchingPodForwarder struct {
EntryManager
namespaces []string
podSelector kubernetes.PodSelector
namespaces []string
podSelector kubernetes.PodSelector
labelSelector string
}

// NewWatchingPodForwarder returns a struct that tracks and port-forwards pods as they are created and modified
func NewWatchingPodForwarder(em EntryManager, podSelector kubernetes.PodSelector, namespaces []string) *WatchingPodForwarder {
func NewWatchingPodForwarder(em EntryManager, podSelector kubernetes.PodSelector, labelSelector string, namespaces []string) *WatchingPodForwarder {
return &WatchingPodForwarder{
EntryManager: em,
podSelector: podSelector,
namespaces: namespaces,
EntryManager: em,
podSelector: podSelector,
labelSelector: labelSelector,
namespaces: namespaces,
}
}

func (p *WatchingPodForwarder) Start(ctx context.Context) error {
aggregate := make(chan watch.Event)
stopWatchers, err := aggregatePodWatcher(p.namespaces, aggregate)
stopWatchers, err := aggregatePodWatcher(p.labelSelector, p.namespaces, aggregate)
if err != nil {
stopWatchers()
return fmt.Errorf("initializing pod watcher: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/skaffold/kubernetes/portforward/pod_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func TestAutomaticPortForwardPod(t *testing.T) {
forwardedPorts: newForwardedPorts(),
forwardedResources: newForwardedResources(),
}
p := NewWatchingPodForwarder(entryManager, kubernetes.NewImageList(), nil)
p := NewWatchingPodForwarder(entryManager, kubernetes.NewImageList(), "", nil)
if test.forwarder == nil {
test.forwarder = newTestForwarder()
}
Expand Down Expand Up @@ -473,7 +473,7 @@ func TestStartPodForwarder(t *testing.T) {
client.PrependWatchReactor("*", testutil.SetupFakeWatcher(fakeWatcher))

waitForWatcher := make(chan bool)
t.Override(&aggregatePodWatcher, func(_ []string, aggregate chan<- watch.Event) (func(), error) {
t.Override(&aggregatePodWatcher, func(_ string, _ []string, aggregate chan<- watch.Event) (func(), error) {
go func() {
waitForWatcher <- true
for msg := range fakeWatcher.ResultChan() {
Expand All @@ -487,7 +487,7 @@ func TestStartPodForwarder(t *testing.T) {
imageList := kubernetes.NewImageList()
imageList.Add("image")

p := NewWatchingPodForwarder(NewEntryManager(ioutil.Discard, nil), imageList, nil)
p := NewWatchingPodForwarder(NewEntryManager(ioutil.Discard, nil), imageList, "", nil)
fakeForwarder := newTestForwarder()
p.EntryForwarder = fakeForwarder
p.Start(context.Background())
Expand Down
3 changes: 2 additions & 1 deletion pkg/skaffold/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// AggregatePodWatcher returns a watcher for multiple namespaces.
func AggregatePodWatcher(namespaces []string, aggregate chan<- watch.Event) (func(), error) {
func AggregatePodWatcher(labelSelector string, namespaces []string, aggregate chan<- watch.Event) (func(), error) {
watchers := make([]watch.Interface, 0, len(namespaces))
stopWatchers := func() {
for _, w := range watchers {
Expand All @@ -42,6 +42,7 @@ func AggregatePodWatcher(namespaces []string, aggregate chan<- watch.Event) (fun
for _, ns := range namespaces {
watcher, err := kubeclient.CoreV1().Pods(ns).Watch(metav1.ListOptions{
TimeoutSeconds: &forever,
LabelSelector: labelSelector,
})
if err != nil {
stopWatchers()
Expand Down
6 changes: 3 additions & 3 deletions pkg/skaffold/kubernetes/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestAggregatePodWatcher(t *testing.T) {
testutil.Run(t, "fail to get client", func(t *testutil.T) {
t.Override(&Client, func() (kubernetes.Interface, error) { return nil, errors.New("unable to get client") })

cleanup, err := AggregatePodWatcher([]string{"ns"}, nil)
cleanup, err := AggregatePodWatcher("", []string{"ns"}, nil)
defer cleanup()

t.CheckErrorContains("unable to get client", err)
Expand All @@ -52,7 +52,7 @@ func TestAggregatePodWatcher(t *testing.T) {
return true, nil, errors.New("unable to watch")
})

cleanup, err := AggregatePodWatcher([]string{"ns"}, nil)
cleanup, err := AggregatePodWatcher("", []string{"ns"}, nil)
defer cleanup()

t.CheckErrorContains("unable to watch", err)
Expand All @@ -63,7 +63,7 @@ func TestAggregatePodWatcher(t *testing.T) {
t.Override(&Client, func() (kubernetes.Interface, error) { return clientset, nil })

events := make(chan watch.Event)
cleanup, err := AggregatePodWatcher([]string{"ns1", "ns2"}, events)
cleanup, err := AggregatePodWatcher("", []string{"ns1", "ns2"}, events)
defer cleanup()
t.CheckNoError(err)

Expand Down
1 change: 1 addition & 0 deletions pkg/skaffold/runner/debugging.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ func (r *SkaffoldRunner) createContainerManager(out io.Writer) {
out,
kubectlCLI,
r.podSelector,
r.defaultLabeller.RunIDSelector(),
r.runCtx.Namespaces)
}
2 changes: 1 addition & 1 deletion pkg/skaffold/runner/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ func (r *SkaffoldRunner) createLogger(out io.Writer, artifacts []*latest.Artifac

func (r *SkaffoldRunner) createLoggerForImages(out io.Writer, images []string) {
kubectlCLI := kubectl.NewFromRunContext(r.runCtx)
r.logger = kubernetes.NewLogAggregator(out, kubectlCLI, images, r.podSelector, r.runCtx.Namespaces)
r.logger = kubernetes.NewLogAggregator(out, kubectlCLI, images, r.podSelector, r.defaultLabeller.RunIDSelector(), r.runCtx.Namespaces)
}
2 changes: 1 addition & 1 deletion pkg/skaffold/runner/portforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (r *SkaffoldRunner) createForwarder(out io.Writer) {
r.forwarderManager = portforward.NewForwarderManager(out,
kubectlCLI,
r.podSelector,
r.runCtx.Namespaces,
r.defaultLabeller.RunIDSelector(),
r.runCtx.Namespaces,
r.runCtx.Opts.PortForward,
r.portForwardResources)
}

0 comments on commit 57e7ecd

Please sign in to comment.