Skip to content

Commit

Permalink
Merge pull request #4223 from tejal29/hook_pod_check
Browse files Browse the repository at this point in the history
Initial prototype for pod health check hook up
  • Loading branch information
tejal29 authored Jun 5, 2020
2 parents 08b086c + 3ae7d5e commit adf6da6
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 30 deletions.
97 changes: 80 additions & 17 deletions pkg/skaffold/deploy/resource/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,24 @@ import (
"strings"
"time"

"github.com/sirupsen/logrus"

"github.com/GoogleContainerTools/skaffold/pkg/diag"
"github.com/GoogleContainerTools/skaffold/pkg/diag/validator"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubectl"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/runcontext"
"github.com/GoogleContainerTools/skaffold/proto"
)

const (
deploymentType = "deployment"
rollOutSuccess = "successfully rolled out"
connectionErrMsg = "Unable to connect to the server"
killedErrMsg = "signal: killed"
deploymentType = "deployment"
rollOutSuccess = "successfully rolled out"
connectionErrMsg = "Unable to connect to the server"
killedErrMsg = "signal: killed"
defaultPodCheckDeadline = 30 * time.Second
tabHeader = " -"
tab = " "
)

var (
Expand All @@ -40,20 +49,26 @@ var (
)

type Deployment struct {
name string
namespace string
rType string
status Status
done bool
deadline time.Duration
name string
namespace string
rType string
status Status
done bool
deadline time.Duration
pods map[string]validator.Resource
podValidator diag.Diagnose
}

func (d *Deployment) Deadline() time.Duration {
return d.deadline
}

func (d *Deployment) UpdateStatus(details string, err error) {
updated := newStatus(details, err)
errCode := proto.StatusCode_STATUSCHECK_SUCCESS
if err != nil {
errCode = proto.StatusCode_STATUSCHECK_UNKNOWN
}
updated := newStatus(details, errCode, err)
if d.status.Equal(updated) {
d.status.changed = false
return
Expand All @@ -67,14 +82,20 @@ func (d *Deployment) UpdateStatus(details string, err error) {

func NewDeployment(name string, ns string, deadline time.Duration) *Deployment {
return &Deployment{
name: name,
namespace: ns,
rType: deploymentType,
status: newStatus("", nil),
deadline: deadline,
name: name,
namespace: ns,
rType: deploymentType,
status: newStatus("", proto.StatusCode_STATUSCHECK_UNKNOWN, nil),
deadline: deadline,
podValidator: diag.New(nil),
}
}

func (d *Deployment) WithValidator(pd diag.Diagnose) *Deployment {
d.podValidator = pd
return d
}

func (d *Deployment) CheckStatus(ctx context.Context, runCtx *runcontext.RunContext) {
kubeCtl := kubectl.NewFromRunContext(runCtx)

Expand All @@ -91,6 +112,9 @@ func (d *Deployment) CheckStatus(ctx context.Context, runCtx *runcontext.RunCont
}

d.UpdateStatus(details, err)
if err := d.fetchPods(ctx); err != nil {
logrus.Debugf("pod statuses could be fetched this time due to %s", err)
}
}

func (d *Deployment) String() string {
Expand All @@ -113,12 +137,26 @@ func (d *Deployment) IsStatusCheckComplete() bool {
return d.done
}

// This returns a string representing deployment status along with tab header
// e.g.
// - testNs:deployment/leeroy-app: waiting for rollout to complete. (1/2) pending
// - testNs:pod/leeroy-app-xvbg : error pulling container image
func (d *Deployment) ReportSinceLastUpdated() string {
if d.status.reported && !d.status.changed {
return ""
}
d.status.reported = true
return fmt.Sprintf("%s: %s", d, d.status)
if d.status.String() == "" {
return ""
}
var result strings.Builder
result.WriteString(fmt.Sprintf("%s %s: %s", tabHeader, d, d.status))
for _, p := range d.pods {
if p.Error() != nil {
result.WriteString(fmt.Sprintf("%s %s %s: %s\n", tab, tabHeader, p, p.Error()))
}
}
return result.String()
}

func (d *Deployment) cleanupStatus(msg string) string {
Expand Down Expand Up @@ -148,3 +186,28 @@ func isErrAndNotRetryAble(err error) bool {
}
return err != ErrKubectlConnection
}

func (d *Deployment) fetchPods(ctx context.Context) error {
timeoutContext, cancel := context.WithTimeout(ctx, defaultPodCheckDeadline)
defer cancel()
pods, err := d.podValidator.Run(timeoutContext)
if err != nil {
return err
}

newPods := map[string]validator.Resource{}
d.status.changed = false
for _, p := range pods {
originalPod, ok := d.pods[p.String()]
if !ok {
d.status.changed = true
event.ResourceStatusCheckEventCompleted(p.String(), p.Error())
} else if originalPod.StatusCode != p.StatusCode {
d.status.changed = true
event.ResourceStatusCheckEventCompleted(p.String(), p.Error())
}
newPods[p.String()] = p
}
d.pods = newPods
return nil
}
8 changes: 4 additions & 4 deletions pkg/skaffold/deploy/resource/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ func TestReportSinceLastUpdated(t *testing.T) {
description: "updating an error status",
message: "cannot pull image",
err: errors.New("cannot pull image"),
expected: "test-ns:deployment/test: cannot pull image",
expected: " - test-ns:deployment/test: cannot pull image",
},
{
description: "updating a non error status",
message: "waiting for container",
expected: "test-ns:deployment/test: waiting for container",
expected: " - test-ns:deployment/test: waiting for container",
},
}
for _, test := range tests {
Expand All @@ -222,7 +222,7 @@ func TestReportSinceLastUpdatedMultipleTimes(t *testing.T) {
description: "report first time should return status",
statuses: []string{"cannot pull image"},
reportStatusSeq: []bool{true},
expected: "test-ns:deployment/test: cannot pull image",
expected: " - test-ns:deployment/test: cannot pull image",
},
{
description: "report 2nd time should not return when same status",
Expand All @@ -234,7 +234,7 @@ func TestReportSinceLastUpdatedMultipleTimes(t *testing.T) {
description: "report called after multiple changes but last status was not changed.",
statuses: []string{"cannot pull image", "changed but not reported", "changed but not reported", "changed but not reported"},
reportStatusSeq: []bool{true, false, false, true},
expected: "test-ns:deployment/test: changed but not reported",
expected: " - test-ns:deployment/test: changed but not reported",
},
}
for _, test := range tests {
Expand Down
12 changes: 11 additions & 1 deletion pkg/skaffold/deploy/resource/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ limitations under the License.

package resource

import (
"github.com/GoogleContainerTools/skaffold/proto"
)

type Status struct {
err error
details string
errCode proto.StatusCode
changed bool
reported bool
}
Expand All @@ -27,6 +32,10 @@ func (rs Status) Error() error {
return rs.err
}

func (rs Status) ErrorCode() proto.StatusCode {
return rs.errCode
}

func (rs Status) String() string {
if rs.err != nil {
return rs.err.Error()
Expand All @@ -44,10 +53,11 @@ func (rs Status) Equal(other Status) bool {
return rs.err == other.err
}

func newStatus(msg string, err error) Status {
func newStatus(msg string, errCode proto.StatusCode, err error) Status {
return Status{
details: msg,
err: err,
errCode: errCode,
changed: true,
}
}
2 changes: 1 addition & 1 deletion pkg/skaffold/deploy/resource/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestString(t *testing.T) {
}
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
status := newStatus(test.details, test.err)
status := newStatus(test.details, 0, test.err)
t.CheckDeepEqual(test.expected, status.String())
})
}
Expand Down
23 changes: 17 additions & 6 deletions pkg/skaffold/deploy/status_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/GoogleContainerTools/skaffold/pkg/diag"
"github.com/GoogleContainerTools/skaffold/pkg/diag/validator"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/deploy/resource"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
pkgkubernetes "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes"
Expand Down Expand Up @@ -85,18 +87,19 @@ func statusCheck(ctx context.Context, defaultLabeller *DefaultLabeller, runCtx *
wg.Add(1)
go func(r *resource.Deployment) {
defer wg.Done()
// keep updating the resource status until it fails/succeeds/times out
pollDeploymentStatus(ctx, runCtx, r)
rcCopy := c.markProcessed(r.Status().Error())
printStatusCheckSummary(out, r, rcCopy)
}(d)
}

// Retrieve pending resource states
// Retrieve pending deployments statuses
go func() {
printDeploymentStatus(ctx, out, deployments, deadline)
}()

// Wait for all deployment status to be fetched
// Wait for all deployment statuses to be fetched
wg.Wait()
return getSkaffoldDeployStatus(c)
}
Expand All @@ -109,15 +112,23 @@ func getDeployments(client kubernetes.Interface, ns string, l *DefaultLabeller,
return nil, fmt.Errorf("could not fetch deployments: %w", err)
}

deployments := make([]*resource.Deployment, 0, len(deps.Items))
for _, d := range deps.Items {
deployments := make([]*resource.Deployment, len(deps.Items))
for i, d := range deps.Items {
var deadline time.Duration
if d.Spec.ProgressDeadlineSeconds == nil || *d.Spec.ProgressDeadlineSeconds == kubernetesMaxDeadline {
deadline = deadlineDuration
} else {
deadline = time.Duration(*d.Spec.ProgressDeadlineSeconds) * time.Second
}
deployments = append(deployments, resource.NewDeployment(d.Name, d.Namespace, deadline))
pd := diag.New([]string{d.Namespace}).
WithLabel(RunIDLabel, l.Labels()[RunIDLabel]).
WithValidators([]validator.Validator{validator.NewPodValidator(client)})

for k, v := range d.Spec.Template.Labels {
pd = pd.WithLabel(k, v)
}

deployments[i] = resource.NewDeployment(d.Name, d.Namespace, deadline).WithValidator(pd)
}

return deployments, nil
Expand Down Expand Up @@ -208,7 +219,7 @@ func printStatus(deployments []*resource.Deployment, out io.Writer) bool {
allDone = false
if str := r.ReportSinceLastUpdated(); str != "" {
event.ResourceStatusCheckEventUpdated(r.String(), str)
fmt.Fprintln(out, tabHeader, trimNewLine(str))
fmt.Fprintln(out, trimNewLine(str))
}
}
return allDone
Expand Down
5 changes: 4 additions & 1 deletion pkg/skaffold/deploy/status_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
utilpointer "k8s.io/utils/pointer"

"github.com/GoogleContainerTools/skaffold/pkg/diag"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/config"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/deploy/resource"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
Expand Down Expand Up @@ -214,7 +216,8 @@ func TestGetDeployments(t *testing.T) {
client := fakekubeclientset.NewSimpleClientset(objs...)
actual, err := getDeployments(client, "test", labeller, 200*time.Second)
t.CheckErrorAndDeepEqual(test.shouldErr, err, &test.expected, &actual,
cmp.AllowUnexported(resource.Deployment{}, resource.Status{}))
cmp.AllowUnexported(resource.Deployment{}, resource.Status{}),
cmpopts.IgnoreInterfaces(struct{ diag.Diagnose }{}))
})
}
}
Expand Down

0 comments on commit adf6da6

Please sign in to comment.