Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit TaskEvent messages for DevLoop, Build, and Deploy phases #5637

Merged
merged 5 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/skaffold/runner/build_deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"time"

eventV2 "github.com/GoogleContainerTools/skaffold/pkg/skaffold/event/v2"
"github.com/sirupsen/logrus"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/build"
Expand All @@ -35,17 +36,21 @@ import (

// Build builds a list of artifacts.
func (r *SkaffoldRunner) Build(ctx context.Context, out io.Writer, artifacts []*latest.Artifact) ([]build.Artifact, error) {
eventV2.TaskInProgress("Build", r.devIteration)

// Use tags directly from the Kubernetes manifests.
if r.runCtx.DigestSource() == noneDigestSource {
return []build.Artifact{}, nil
}

if err := checkWorkspaces(artifacts); err != nil {
eventV2.TaskFailed("Build", r.devIteration, err)
return nil, err
}

tags, err := r.imageTags(ctx, out, artifacts)
if err != nil {
eventV2.TaskFailed("Build", r.devIteration, err)
return nil, err
}

Expand Down Expand Up @@ -78,6 +83,7 @@ func (r *SkaffoldRunner) Build(ctx context.Context, out io.Writer, artifacts []*
return bRes, nil
})
if err != nil {
eventV2.TaskFailed("Build", r.devIteration, err)
return nil, err
}

Expand All @@ -87,6 +93,7 @@ func (r *SkaffoldRunner) Build(ctx context.Context, out io.Writer, artifacts []*
// Make sure all artifacts are redeployed. Not only those that were just built.
r.builds = build.MergeWithPreviousBuilds(bRes, r.builds)

eventV2.TaskSucceeded("Build", r.devIteration)
return bRes, nil
}

Expand All @@ -101,6 +108,8 @@ func (r *SkaffoldRunner) Test(ctx context.Context, out io.Writer, artifacts []bu

// DeployAndLog deploys a list of already built artifacts and optionally show the logs.
func (r *SkaffoldRunner) DeployAndLog(ctx context.Context, out io.Writer, artifacts []build.Artifact) error {
eventV2.TaskInProgress("Deploy", r.devIteration)

// Update which images are logged.
r.addTagsToPodSelector(artifacts)

Expand All @@ -111,6 +120,7 @@ func (r *SkaffoldRunner) DeployAndLog(ctx context.Context, out io.Writer, artifa
logger.SetSince(time.Now())
// First deploy
if err := r.Deploy(ctx, out, artifacts); err != nil {
eventV2.TaskFailed("Deploy", r.devIteration, err)
return err
}

Expand All @@ -123,6 +133,7 @@ func (r *SkaffoldRunner) DeployAndLog(ctx context.Context, out io.Writer, artifa

// Start printing the logs after deploy is finished
if err := logger.Start(ctx, r.runCtx.GetNamespaces()); err != nil {
eventV2.TaskFailed("Deploy", r.devIteration, err)
return fmt.Errorf("starting logger: %w", err)
}

Expand All @@ -131,6 +142,7 @@ func (r *SkaffoldRunner) DeployAndLog(ctx context.Context, out io.Writer, artifa
<-ctx.Done()
}

eventV2.TaskSucceeded("Deploy", r.devIteration)
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/skaffold/runner/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"time"

eventV2 "github.com/GoogleContainerTools/skaffold/pkg/skaffold/event/v2"
"github.com/sirupsen/logrus"
"k8s.io/client-go/tools/clientcmd/api"

Expand Down Expand Up @@ -82,10 +83,12 @@ See https://skaffold.dev/docs/pipeline-stages/taggers/#how-tagging-works`)
}

event.DeployInProgress()
eventV2.TaskInProgress("Deploy", r.devIteration)
namespaces, err := r.deployer.Deploy(ctx, deployOut, artifacts)
postDeployFn()
if err != nil {
event.DeployFailed(err)
eventV2.TaskFailed("Deploy", r.devIteration, err)
return err
}

Expand All @@ -97,6 +100,7 @@ See https://skaffold.dev/docs/pipeline-stages/taggers/#how-tagging-works`)
return err
}
event.DeployComplete()
eventV2.TaskSucceeded("Deploy", r.devIteration)
r.runCtx.UpdateNamespaces(namespaces)
sErr := r.performStatusCheck(ctx, statusCheckOut)
return sErr
Expand Down
20 changes: 19 additions & 1 deletion pkg/skaffold/runner/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/color"
sErrors "github.com/GoogleContainerTools/skaffold/pkg/skaffold/errors"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
eventV2 "github.com/GoogleContainerTools/skaffold/pkg/skaffold/event/v2"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/filemon"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/instrumentation"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes"
Expand Down Expand Up @@ -68,6 +69,7 @@ func (r *SkaffoldRunner) doDev(ctx context.Context, out io.Writer, logger *kuber
defer r.monitor.Reset()
defer r.listener.LogWatchToUser(out)
event.DevLoopInProgress(r.devIteration)
eventV2.TaskInProgress("DevLoop", r.devIteration)
defer func() { r.devIteration++ }()

meterUpdated := false
Expand All @@ -87,6 +89,7 @@ func (r *SkaffoldRunner) doDev(ctx context.Context, out io.Writer, logger *kuber
logrus.Warnln("Skipping deploy due to sync error:", err)
fileSyncFailed(fileCount, s.Image, err)
event.DevLoopFailedInPhase(r.devIteration, sErrors.Sync, err)
eventV2.TaskFailed("DevLoop", r.devIteration, err)
return nil
}

Expand All @@ -111,6 +114,7 @@ func (r *SkaffoldRunner) doDev(ctx context.Context, out io.Writer, logger *kuber
if err != nil {
logrus.Warnln("Skipping test and deploy due to build error:", err)
event.DevLoopFailedInPhase(r.devIteration, sErrors.Build, err)
eventV2.TaskFailed("DevLoop", r.devIteration, err)
return nil
}
needsTest = true
Expand All @@ -132,6 +136,7 @@ func (r *SkaffoldRunner) doDev(ctx context.Context, out io.Writer, logger *kuber
logrus.Warnln("Skipping deploy due to test error:", err)
}
event.DevLoopFailedInPhase(r.devIteration, sErrors.Test, err)
eventV2.TaskFailed("DevLoop", r.devIteration, err)
return nil
}
}
Expand All @@ -150,13 +155,15 @@ func (r *SkaffoldRunner) doDev(ctx context.Context, out io.Writer, logger *kuber
if err := r.Deploy(ctx, out, r.builds); err != nil {
logrus.Warnln("Skipping deploy due to error:", err)
event.DevLoopFailedInPhase(r.devIteration, sErrors.Deploy, err)
eventV2.TaskFailed("DevLoop", r.devIteration, err)
return nil
}
if err := forwarderManager.Start(ctx, r.runCtx.GetNamespaces()); err != nil {
logrus.Warnln("Port forwarding failed:", err)
}
}
event.DevLoopComplete(r.devIteration)
eventV2.TaskSucceeded("DevLoop", r.devIteration)
logger.Unmute()
return nil
}
Expand All @@ -165,6 +172,7 @@ func (r *SkaffoldRunner) doDev(ctx context.Context, out io.Writer, logger *kuber
// config until interrupted by the user.
func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*latest.Artifact) error {
event.DevLoopInProgress(r.devIteration)
eventV2.TaskInProgress("DevLoop", r.devIteration)
defer func() { r.devIteration++ }()
g := getTransposeGraph(artifacts)
// Watch artifacts
Expand Down Expand Up @@ -200,6 +208,7 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
},
); err != nil {
event.DevLoopFailedWithErrorCode(r.devIteration, proto.StatusCode_DEVINIT_REGISTER_BUILD_DEPS, err)
eventV2.TaskFailed("DevLoop", r.devIteration, err)
return fmt.Errorf("watching files for artifact %q: %w", artifact.ImageName, err)
}
}
Expand All @@ -211,6 +220,7 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
func(filemon.Events) { r.changeSet.needsRetest = true },
); err != nil {
event.DevLoopFailedWithErrorCode(r.devIteration, proto.StatusCode_DEVINIT_REGISTER_TEST_DEPS, err)
eventV2.TaskFailed("DevLoop", r.devIteration, err)
return fmt.Errorf("watching test files: %w", err)
}

Expand All @@ -220,6 +230,7 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
func(filemon.Events) { r.changeSet.needsRedeploy = true },
); err != nil {
event.DevLoopFailedWithErrorCode(r.devIteration, proto.StatusCode_DEVINIT_REGISTER_DEPLOY_DEPS, err)
eventV2.TaskFailed("DevLoop", r.devIteration, err)
return fmt.Errorf("watching files for deployer: %w", err)
}

Expand All @@ -229,6 +240,7 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
func(filemon.Events) { r.changeSet.needsReload = true },
); err != nil {
event.DevLoopFailedWithErrorCode(r.devIteration, proto.StatusCode_DEVINIT_REGISTER_CONFIG_DEP, err)
eventV2.TaskFailed("DevLoop", r.devIteration, err)
return fmt.Errorf("watching skaffold configuration %q: %w", r.runCtx.ConfigurationFile(), err)
}

Expand All @@ -237,19 +249,22 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
// Init Sync State
if err := sync.Init(ctx, artifacts); err != nil {
event.DevLoopFailedWithErrorCode(r.devIteration, proto.StatusCode_SYNC_INIT_ERROR, err)
eventV2.TaskFailed("DevLoop", r.devIteration, err)
return fmt.Errorf("exiting dev mode because initializing sync state failed: %w", err)
}

// First build
bRes, err := r.Build(ctx, out, artifacts)
if err != nil {
event.DevLoopFailedInPhase(r.devIteration, sErrors.Build, err)
eventV2.TaskFailed("DevLoop", r.devIteration, err)
return fmt.Errorf("exiting dev mode because first build failed: %w", err)
}
// First test
if !r.runCtx.SkipTests() {
if err = r.Test(ctx, out, bRes); err != nil {
event.DevLoopFailedInPhase(r.devIteration, sErrors.Build, err)
eventV2.TaskFailed("DevLoop", r.devIteration, err)
return fmt.Errorf("exiting dev mode because test failed after first build: %w", err)
}
}
Expand All @@ -266,6 +281,7 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
// First deploy
if err := r.Deploy(ctx, out, r.builds); err != nil {
event.DevLoopFailedInPhase(r.devIteration, sErrors.Deploy, err)
eventV2.TaskFailed("DevLoop", r.devIteration, err)
return fmt.Errorf("exiting dev mode because first deploy failed: %w", err)
}

Expand All @@ -285,7 +301,9 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la

color.Yellow.Fprintln(out, "Press Ctrl+C to exit")

event.DevLoopComplete(0)
event.DevLoopComplete(r.devIteration)
eventV2.TaskSucceeded("DevLoop", r.devIteration)
r.devIteration++
return r.listener.WatchForChanges(ctx, out, func() error {
return r.doDev(ctx, out, logger, forwarderManager)
})
Expand Down