Skip to content

Commit

Permalink
Merge pull request #3016 from tejal29/add_pod_counter
Browse files Browse the repository at this point in the history
add a resourceCounter to track pods
  • Loading branch information
balopat authored Dec 18, 2019
2 parents 3186964 + a7df42c commit f408bce
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 12 deletions.
50 changes: 40 additions & 10 deletions pkg/skaffold/deploy/status_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ const (
tabHeader = " -"
)

type resourceCounter struct {
deployments *counter
pods *counter
}

type counter struct {
total int
pending int32
Expand All @@ -72,15 +77,15 @@ func StatusCheck(ctx context.Context, defaultLabeller *DefaultLabeller, runCtx *

var wg sync.WaitGroup

c := newCounter(len(deployments))
rc := newResourceCounter(len(deployments))

for _, d := range deployments {
wg.Add(1)
go func(r Resource) {
defer wg.Done()
pollResourceStatus(ctx, runCtx, r)
pending := c.markProcessed(r.Status().Error())
printStatusCheckSummary(out, r, pending, c.total)
rcCopy := rc.markProcessed(r.Status().Error())
printStatusCheckSummary(out, r, rcCopy)
}(d)
}

Expand All @@ -91,7 +96,7 @@ func StatusCheck(ctx context.Context, defaultLabeller *DefaultLabeller, runCtx *

// Wait for all deployment status to be fetched
wg.Wait()
return getSkaffoldDeployStatus(c)
return getSkaffoldDeployStatus(rc.deployments)
}

func getDeployments(client kubernetes.Interface, ns string, l *DefaultLabeller, deadlineDuration time.Duration) ([]Resource, error) {
Expand Down Expand Up @@ -154,18 +159,18 @@ func getDeadline(d int) time.Duration {
return defaultStatusCheckDeadline
}

func printStatusCheckSummary(out io.Writer, r Resource, pending int, total int) {
func printStatusCheckSummary(out io.Writer, r Resource, rc resourceCounter) {
status := fmt.Sprintf("%s %s", tabHeader, r)
if err := r.Status().Error(); err != nil {
event.ResourceStatusCheckEventFailed(r.String(), err)
status = fmt.Sprintf("%s failed.%s Error: %s.",
status,
trimNewLine(getPendingMessage(pending, total)),
trimNewLine(getPendingMessage(rc.deployments.pending, rc.deployments.total)),
trimNewLine(err.Error()),
)
} else {
event.ResourceStatusCheckEventSucceeded(r.String())
status = fmt.Sprintf("%s is ready.%s", status, getPendingMessage(pending, total))
status = fmt.Sprintf("%s is ready.%s", status, getPendingMessage(rc.deployments.pending, rc.deployments.total))
}
color.Default.Fprintln(out, status)
}
Expand Down Expand Up @@ -203,7 +208,7 @@ func printStatus(resources []Resource, out io.Writer) bool {
return allResourcesCheckComplete
}

func getPendingMessage(pending int, total int) string {
func getPendingMessage(pending int32, total int) string {
if pending > 0 {
return fmt.Sprintf(" [%d/%d deployment(s) still pending]", pending, total)
}
Expand All @@ -221,9 +226,34 @@ func newCounter(i int) *counter {
}
}

func (c *counter) markProcessed(err error) int {
func (c *counter) markProcessed(err error) counter {
if err != nil {
atomic.AddInt32(&c.failed, 1)
}
return int(atomic.AddInt32(&c.pending, -1))
atomic.AddInt32(&c.pending, -1)
return c.copy()
}

func (c *counter) copy() counter {
return counter{
total: c.total,
pending: c.pending,
failed: c.failed,
}
}

func newResourceCounter(d int) *resourceCounter {
return &resourceCounter{
deployments: newCounter(d),
pods: newCounter(0),
}
}

func (c *resourceCounter) markProcessed(err error) resourceCounter {
depCp := c.deployments.markProcessed(err)
podCp := c.pods.copy()
return resourceCounter{
deployments: &depCp,
pods: &podCp,
}
}
74 changes: 72 additions & 2 deletions pkg/skaffold/deploy/status_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,12 @@ func TestPrintSummaryStatus(t *testing.T) {
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
out := new(bytes.Buffer)
rc := newResourceCounter(10)
rc.deployments.pending = test.pending
printStatusCheckSummary(
out,
withStatus(resource.NewDeployment("dep", "test", 0), "", test.err),
int(test.pending),
10,
*rc,
)
t.CheckDeepEqual(test.expected, out.String())
})
Expand Down Expand Up @@ -414,3 +415,72 @@ func withStatus(d *resource.Deployment, details string, err error) *resource.Dep
d.UpdateStatus(details, err)
return d
}

func TestCounterCopy(t *testing.T) {
tests := []struct {
description string
c *counter
expected counter
}{
{
description: "initial counter is copied correctly ",
c: newCounter(10),
expected: *newCounter(10),
},
{
description: "counter with updated pending is copied correctly",
c: &counter{total: 10, pending: 2},
expected: counter{total: 10, pending: 2},
},
{
description: "counter with updated failed and pending is copied correctly",
c: &counter{total: 10, pending: 5, failed: 3},
expected: counter{total: 10, pending: 5, failed: 3},
},
}
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
t.CheckDeepEqual(test.expected, test.c.copy(), cmp.AllowUnexported(counter{}))
})
}
}

func TestResourceMarkProcessed(t *testing.T) {
tests := []struct {
description string
c *resourceCounter
err error
expected resourceCounter
}{
{
description: "when deployment failed, counter is updated",
c: newResourceCounter(10),
err: errors.New("some err"),
expected: resourceCounter{
deployments: &counter{total: 10, failed: 1, pending: 9},
pods: newCounter(0),
},
},
{
description: "when deployment is successful, counter is updated",
c: newResourceCounter(10),
expected: resourceCounter{
deployments: &counter{total: 10, failed: 0, pending: 9},
pods: newCounter(0),
},
},
{
description: "counter when 1 deployment is updated correctly",
c: newResourceCounter(1),
expected: resourceCounter{
deployments: &counter{total: 1, failed: 0, pending: 0},
pods: newCounter(0),
},
},
}
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
t.CheckDeepEqual(test.expected, test.c.markProcessed(test.err), cmp.AllowUnexported(resourceCounter{}, counter{}))
})
}
}

0 comments on commit f408bce

Please sign in to comment.