Skip to content

Commit

Permalink
Lock before reading the state
Browse files Browse the repository at this point in the history
And return a deep copy of the state

Signed-off-by: David Gageot <david@gageot.net>
  • Loading branch information
dgageot committed Mar 12, 2019
1 parent eeeeb96 commit 5ad31c2
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 20 deletions.
53 changes: 34 additions & 19 deletions pkg/skaffold/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package event

import (
"context"
"encoding/json"
"fmt"
"sync"

Expand Down Expand Up @@ -49,7 +50,7 @@ type eventHandler struct {
eventLog []proto.LogEntry
logLock sync.Mutex

state *proto.State
state proto.State
stateLock sync.Mutex

listeners []chan proto.LogEntry
Expand All @@ -59,6 +60,18 @@ func (ev *eventHandler) RegisterListener(listener chan proto.LogEntry) {
ev.listeners = append(ev.listeners, listener)
}

func (ev *eventHandler) getState() proto.State {
ev.stateLock.Lock()
// Deep copy
buf, _ := json.Marshal(ev.state)
ev.stateLock.Unlock()

var state proto.State
json.Unmarshal(buf, &state)

return state
}

func (ev *eventHandler) logEvent(entry proto.LogEntry) {
ev.logLock.Lock()

Expand Down Expand Up @@ -95,32 +108,34 @@ func (ev *eventHandler) forEachEvent(callback func(*proto.LogEntry) error) error
}
}

func emptyState(build *latest.BuildConfig) proto.State {
builds := map[string]string{}
if build != nil {
for _, a := range build.Artifacts {
builds[a.ImageName] = NotStarted
}
}

return proto.State{
BuildState: &proto.BuildState{
Artifacts: builds,
},
DeployState: &proto.DeployState{
Status: NotStarted,
},
ForwardedPorts: make(map[string]*proto.PortEvent),
}
}

// InitializeState instantiates the global state of the skaffold runner, as well as the event log.
// It returns a shutdown callback for tearing down the grpc server, which the runner is responsible for calling.
// This function can only be called once.
func InitializeState(build *latest.BuildConfig, deploy *latest.DeployConfig, opts *config.SkaffoldOptions) (func() error, error) {
var err error
serverShutdown := func() error { return nil }
once.Do(func() {
builds := map[string]string{}
deploys := map[string]string{}
if build != nil {
for _, a := range build.Artifacts {
builds[a.ImageName] = NotStarted
deploys[a.ImageName] = NotStarted
}
}

ev = &eventHandler{
state: &proto.State{
BuildState: &proto.BuildState{
Artifacts: builds,
},
DeployState: &proto.DeployState{
Status: NotStarted,
},
ForwardedPorts: make(map[string]*proto.PortEvent),
},
state: emptyState(build),
}

if opts.EnableRPC {
Expand Down
15 changes: 15 additions & 0 deletions pkg/skaffold/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event/proto"
"github.com/GoogleContainerTools/skaffold/testutil"
)

func TestGetLogEvents(t *testing.T) {
Expand Down Expand Up @@ -55,3 +56,17 @@ func TestGetLogEvents(t *testing.T) {
}
}
}

func TestGetState(t *testing.T) {
ev := &eventHandler{
state: emptyState(nil),
}

ev.stateLock.Lock()
ev.state.BuildState.Artifacts["img"] = Complete
ev.stateLock.Unlock()

state := ev.getState()

testutil.CheckDeepEqual(t, Complete, state.BuildState.Artifacts["img"])
}
3 changes: 2 additions & 1 deletion pkg/skaffold/event/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (
type server struct{}

func (s *server) GetState(context.Context, *empty.Empty) (*proto.State, error) {
return ev.state, nil
state := ev.getState()
return &state, nil
}

func (s *server) EventLog(stream proto.SkaffoldService_EventLogServer) error {
Expand Down

0 comments on commit 5ad31c2

Please sign in to comment.