Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial prototype for pod health check hook up #4223

Merged
merged 14 commits into from
Jun 5, 2020
97 changes: 80 additions & 17 deletions pkg/skaffold/deploy/resource/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,24 @@ import (
"strings"
"time"

"github.com/sirupsen/logrus"

"github.com/GoogleContainerTools/skaffold/pkg/diag"
"github.com/GoogleContainerTools/skaffold/pkg/diag/validator"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubectl"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/runcontext"
"github.com/GoogleContainerTools/skaffold/proto"
)

const (
deploymentType = "deployment"
rollOutSuccess = "successfully rolled out"
connectionErrMsg = "Unable to connect to the server"
killedErrMsg = "signal: killed"
deploymentType = "deployment"
rollOutSuccess = "successfully rolled out"
connectionErrMsg = "Unable to connect to the server"
killedErrMsg = "signal: killed"
defaultPodCheckDeadline = 30 * time.Second
tabHeader = " -"
tab = " "
)

var (
Expand All @@ -40,20 +49,26 @@ var (
)

type Deployment struct {
name string
namespace string
rType string
status Status
done bool
deadline time.Duration
name string
namespace string
rType string
status Status
done bool
deadline time.Duration
pods map[string]validator.Resource
podValidator diag.Diagnose
}

func (d *Deployment) Deadline() time.Duration {
return d.deadline
}

func (d *Deployment) UpdateStatus(details string, err error) {
updated := newStatus(details, err)
errCode := proto.StatusCode_STATUSCHECK_SUCCESS
if err != nil {
errCode = proto.StatusCode_STATUSCHECK_UNKNOWN
}
updated := newStatus(details, errCode, err)
if d.status.Equal(updated) {
d.status.changed = false
return
Expand All @@ -67,14 +82,20 @@ func (d *Deployment) UpdateStatus(details string, err error) {

func NewDeployment(name string, ns string, deadline time.Duration) *Deployment {
return &Deployment{
name: name,
namespace: ns,
rType: deploymentType,
status: newStatus("", nil),
deadline: deadline,
name: name,
namespace: ns,
rType: deploymentType,
status: newStatus("", proto.StatusCode_STATUSCHECK_UNKNOWN, nil),
deadline: deadline,
podValidator: diag.New(nil),
}
}

func (d *Deployment) WithValidator(pd diag.Diagnose) *Deployment {
d.podValidator = pd
return d
}

func (d *Deployment) CheckStatus(ctx context.Context, runCtx *runcontext.RunContext) {
kubeCtl := kubectl.NewFromRunContext(runCtx)

Expand All @@ -91,6 +112,9 @@ func (d *Deployment) CheckStatus(ctx context.Context, runCtx *runcontext.RunCont
}

d.UpdateStatus(details, err)
if err := d.fetchPods(ctx); err != nil {
logrus.Debugf("pod statuses could be fetched this time due to %s", err)
}
}

func (d *Deployment) String() string {
Expand All @@ -113,12 +137,26 @@ func (d *Deployment) IsStatusCheckComplete() bool {
return d.done
}

// This returns a string representing deployment status along with tab header
// e.g.
// - testNs:deployment/leeroy-app: waiting for rollout to complete. (1/2) pending
// - testNs:pod/leeroy-app-xvbg : error pulling container image
func (d *Deployment) ReportSinceLastUpdated() string {
if d.status.reported && !d.status.changed {
return ""
}
d.status.reported = true
return fmt.Sprintf("%s: %s", d, d.status)
if d.status.String() == "" {
return ""
}
var result strings.Builder
result.WriteString(fmt.Sprintf("%s %s: %s", tabHeader, d, d.status))
for _, p := range d.pods {
if p.Error() != nil {
result.WriteString(fmt.Sprintf("%s %s %s: %s\n", tab, tabHeader, p, p.Error()))
}
}
return result.String()
}

func (d *Deployment) cleanupStatus(msg string) string {
Expand Down Expand Up @@ -148,3 +186,28 @@ func isErrAndNotRetryAble(err error) bool {
}
return err != ErrKubectlConnection
}

func (d *Deployment) fetchPods(ctx context.Context) error {
timeoutContext, cancel := context.WithTimeout(ctx, defaultPodCheckDeadline)
defer cancel()
pods, err := d.podValidator.Run(timeoutContext)
if err != nil {
return err
}

newPods := map[string]validator.Resource{}
d.status.changed = false
for _, p := range pods {
originalPod, ok := d.pods[p.String()]
if !ok {
d.status.changed = true
event.ResourceStatusCheckEventCompleted(p.String(), p.Error())
} else if originalPod.StatusCode != p.StatusCode {
d.status.changed = true
event.ResourceStatusCheckEventCompleted(p.String(), p.Error())
}
newPods[p.String()] = p
}
d.pods = newPods
return nil
}
8 changes: 4 additions & 4 deletions pkg/skaffold/deploy/resource/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ func TestReportSinceLastUpdated(t *testing.T) {
description: "updating an error status",
message: "cannot pull image",
err: errors.New("cannot pull image"),
expected: "test-ns:deployment/test: cannot pull image",
expected: " - test-ns:deployment/test: cannot pull image",
},
{
description: "updating a non error status",
message: "waiting for container",
expected: "test-ns:deployment/test: waiting for container",
expected: " - test-ns:deployment/test: waiting for container",
},
}
for _, test := range tests {
Expand All @@ -222,7 +222,7 @@ func TestReportSinceLastUpdatedMultipleTimes(t *testing.T) {
description: "report first time should return status",
statuses: []string{"cannot pull image"},
reportStatusSeq: []bool{true},
expected: "test-ns:deployment/test: cannot pull image",
expected: " - test-ns:deployment/test: cannot pull image",
},
{
description: "report 2nd time should not return when same status",
Expand All @@ -234,7 +234,7 @@ func TestReportSinceLastUpdatedMultipleTimes(t *testing.T) {
description: "report called after multiple changes but last status was not changed.",
statuses: []string{"cannot pull image", "changed but not reported", "changed but not reported", "changed but not reported"},
reportStatusSeq: []bool{true, false, false, true},
expected: "test-ns:deployment/test: changed but not reported",
expected: " - test-ns:deployment/test: changed but not reported",
},
}
for _, test := range tests {
Expand Down
12 changes: 11 additions & 1 deletion pkg/skaffold/deploy/resource/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ limitations under the License.

package resource

import (
"github.com/GoogleContainerTools/skaffold/proto"
)

type Status struct {
err error
details string
errCode proto.StatusCode
changed bool
reported bool
}
Expand All @@ -27,6 +32,10 @@ func (rs Status) Error() error {
return rs.err
}

func (rs Status) ErrorCode() proto.StatusCode {
return rs.errCode
}

func (rs Status) String() string {
if rs.err != nil {
return rs.err.Error()
Expand All @@ -44,10 +53,11 @@ func (rs Status) Equal(other Status) bool {
return rs.err == other.err
}

func newStatus(msg string, err error) Status {
func newStatus(msg string, errCode proto.StatusCode, err error) Status {
return Status{
details: msg,
err: err,
errCode: errCode,
changed: true,
}
}
2 changes: 1 addition & 1 deletion pkg/skaffold/deploy/resource/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestString(t *testing.T) {
}
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
status := newStatus(test.details, test.err)
status := newStatus(test.details, 0, test.err)
t.CheckDeepEqual(test.expected, status.String())
})
}
Expand Down
23 changes: 17 additions & 6 deletions pkg/skaffold/deploy/status_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/GoogleContainerTools/skaffold/pkg/diag"
"github.com/GoogleContainerTools/skaffold/pkg/diag/validator"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/deploy/resource"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
pkgkubernetes "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes"
Expand Down Expand Up @@ -85,18 +87,19 @@ func statusCheck(ctx context.Context, defaultLabeller *DefaultLabeller, runCtx *
wg.Add(1)
go func(r *resource.Deployment) {
defer wg.Done()
// keep updating the resource status until it fails/succeeds/times out
pollDeploymentStatus(ctx, runCtx, r)
rcCopy := c.markProcessed(r.Status().Error())
printStatusCheckSummary(out, r, rcCopy)
}(d)
}

// Retrieve pending resource states
// Retrieve pending deployments statuses
go func() {
printDeploymentStatus(ctx, out, deployments, deadline)
}()

// Wait for all deployment status to be fetched
// Wait for all deployment statuses to be fetched
wg.Wait()
return getSkaffoldDeployStatus(c)
}
Expand All @@ -109,15 +112,23 @@ func getDeployments(client kubernetes.Interface, ns string, l *DefaultLabeller,
return nil, fmt.Errorf("could not fetch deployments: %w", err)
}

deployments := make([]*resource.Deployment, 0, len(deps.Items))
for _, d := range deps.Items {
deployments := make([]*resource.Deployment, len(deps.Items))
for i, d := range deps.Items {
var deadline time.Duration
if d.Spec.ProgressDeadlineSeconds == nil || *d.Spec.ProgressDeadlineSeconds == kubernetesMaxDeadline {
deadline = deadlineDuration
} else {
deadline = time.Duration(*d.Spec.ProgressDeadlineSeconds) * time.Second
}
deployments = append(deployments, resource.NewDeployment(d.Name, d.Namespace, deadline))
pd := diag.New([]string{d.Namespace}).
WithLabel(RunIDLabel, l.Labels()[RunIDLabel]).
WithValidators([]validator.Validator{validator.NewPodValidator(client)})

for k, v := range d.Spec.Template.Labels {
pd = pd.WithLabel(k, v)
}

deployments[i] = resource.NewDeployment(d.Name, d.Namespace, deadline).WithValidator(pd)
}

return deployments, nil
Expand Down Expand Up @@ -208,7 +219,7 @@ func printStatus(deployments []*resource.Deployment, out io.Writer) bool {
allDone = false
if str := r.ReportSinceLastUpdated(); str != "" {
event.ResourceStatusCheckEventUpdated(r.String(), str)
fmt.Fprintln(out, tabHeader, trimNewLine(str))
fmt.Fprintln(out, trimNewLine(str))
}
}
return allDone
Expand Down
5 changes: 4 additions & 1 deletion pkg/skaffold/deploy/status_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
utilpointer "k8s.io/utils/pointer"

"github.com/GoogleContainerTools/skaffold/pkg/diag"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/config"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/deploy/resource"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
Expand Down Expand Up @@ -214,7 +216,8 @@ func TestGetDeployments(t *testing.T) {
client := fakekubeclientset.NewSimpleClientset(objs...)
actual, err := getDeployments(client, "test", labeller, 200*time.Second)
t.CheckErrorAndDeepEqual(test.shouldErr, err, &test.expected, &actual,
cmp.AllowUnexported(resource.Deployment{}, resource.Status{}))
cmp.AllowUnexported(resource.Deployment{}, resource.Status{}),
cmpopts.IgnoreInterfaces(struct{ diag.Diagnose }{}))
})
}
}
Expand Down