Skip to content

Commit

Permalink
Merge pull request #1786 from dgageot/fix-race-events
Browse files Browse the repository at this point in the history
Fix race in event logs
  • Loading branch information
dgageot authored Mar 12, 2019
2 parents eec4192 + 697ab63 commit b0f4b03
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 53 deletions.
103 changes: 69 additions & 34 deletions pkg/skaffold/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ package event

import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/config"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event/proto"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/version"
"github.com/golang/protobuf/ptypes"
"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/golang/protobuf/ptypes"
)

const (
Expand All @@ -46,27 +46,85 @@ var (
cli proto.SkaffoldServiceClient // for plugin RPC connections
)

type eventLog []proto.LogEntry

type eventHandler struct {
eventLog
eventLog []proto.LogEntry
logLock sync.Mutex

listeners []chan proto.LogEntry
state *proto.State
state proto.State
stateLock sync.Mutex

logLock *sync.Mutex
stateLock *sync.Mutex
listeners []chan proto.LogEntry
}

func (ev *eventHandler) RegisterListener(listener chan proto.LogEntry) {
ev.listeners = append(ev.listeners, listener)
}

func (ev *eventHandler) getState() proto.State {
ev.stateLock.Lock()
// Deep copy
buf, _ := json.Marshal(ev.state)
ev.stateLock.Unlock()

var state proto.State
json.Unmarshal(buf, &state)

return state
}

func (ev *eventHandler) logEvent(entry proto.LogEntry) {
ev.logLock.Lock()

for _, c := range ev.listeners {
c <- entry
}
ev.eventLog = append(ev.eventLog, entry)

ev.logLock.Unlock()
}

func (ev *eventHandler) forEachEvent(callback func(*proto.LogEntry) error) error {
c := make(chan proto.LogEntry)

ev.logLock.Lock()

oldEvents := make([]proto.LogEntry, len(ev.eventLog))
copy(oldEvents, ev.eventLog)
ev.RegisterListener(c)

ev.logLock.Unlock()

for i := range oldEvents {
if err := callback(&oldEvents[i]); err != nil {
return err
}
}

for {
entry := <-c
if err := callback(&entry); err != nil {
return err
}
}
}

func emptyState(build *latest.BuildConfig) proto.State {
builds := map[string]string{}
if build != nil {
for _, a := range build.Artifacts {
builds[a.ImageName] = NotStarted
}
}

return proto.State{
BuildState: &proto.BuildState{
Artifacts: builds,
},
DeployState: &proto.DeployState{
Status: NotStarted,
},
ForwardedPorts: make(map[string]*proto.PortEvent),
}
}

// InitializeState instantiates the global state of the skaffold runner, as well as the event log.
Expand All @@ -76,29 +134,10 @@ func InitializeState(build *latest.BuildConfig, deploy *latest.DeployConfig, opt
var err error
serverShutdown := func() error { return nil }
once.Do(func() {
builds := map[string]string{}
deploys := map[string]string{}
if build != nil {
for _, a := range build.Artifacts {
builds[a.ImageName] = NotStarted
deploys[a.ImageName] = NotStarted
}
}
state := &proto.State{
BuildState: &proto.BuildState{
Artifacts: builds,
},
DeployState: &proto.DeployState{
Status: NotStarted,
},
ForwardedPorts: make(map[string]*proto.PortEvent),
}
ev = &eventHandler{
eventLog: eventLog{},
state: state,
logLock: &sync.Mutex{},
stateLock: &sync.Mutex{},
state: emptyState(build),
}

if opts.EnableRPC {
serverShutdown, err = newStatusServer(opts.RPCPort)
if err != nil {
Expand Down Expand Up @@ -175,13 +214,10 @@ func handle(event *proto.Event) {
return
}

ev.logLock.Lock()
ev.logEvent(*logEntry)
ev.logLock.Unlock()
}

func LogSkaffoldMetadata(info *version.Info) {
ev.logLock.Lock()
ev.logEvent(proto.LogEntry{
Timestamp: ptypes.TimestampNow(),
Event: &proto.Event{
Expand All @@ -192,5 +228,4 @@ func LogSkaffoldMetadata(info *version.Info) {
},
},
})
ev.logLock.Unlock()
}
72 changes: 72 additions & 0 deletions pkg/skaffold/event/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2019 The Skaffold Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package event

import (
"errors"
"sync/atomic"
"testing"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event/proto"
"github.com/GoogleContainerTools/skaffold/testutil"
)

func TestGetLogEvents(t *testing.T) {
for step := 0; step < 100000; step++ {
ev := &eventHandler{}

ev.logEvent(proto.LogEntry{Entry: "OLD1"})

var done int32
go func() {
ev.logEvent(proto.LogEntry{Entry: "FRESH"})

for atomic.LoadInt32(&done) == 0 {
ev.logEvent(proto.LogEntry{Entry: "POISON PILL"})
}
}()

received := 0
ev.forEachEvent(func(e *proto.LogEntry) error {
if e.Entry == "POISON PILL" {
return errors.New("Done")
}

received++
return nil
})
atomic.StoreInt32(&done, int32(1))

if received != 2 {
t.Fatalf("Expected %d events, Got %d (Step: %d)", 2, received, step)
}
}
}

func TestGetState(t *testing.T) {
ev := &eventHandler{
state: emptyState(nil),
}

ev.stateLock.Lock()
ev.state.BuildState.Artifacts["img"] = Complete
ev.stateLock.Unlock()

state := ev.getState()

testutil.CheckDeepEqual(t, Complete, state.BuildState.Artifacts["img"])
}
22 changes: 3 additions & 19 deletions pkg/skaffold/event/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,21 @@ import (
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event/proto"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"

empty "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"google.golang.org/grpc"
)

type server struct{}

func (s *server) GetState(context.Context, *empty.Empty) (*proto.State, error) {
return ev.state, nil
state := ev.getState()
return &state, nil
}

func (s *server) EventLog(stream proto.SkaffoldService_EventLogServer) error {
ev.logLock.Lock()
for _, entry := range ev.eventLog {
if err := stream.Send(&entry); err != nil {
return err
}
}
ev.logLock.Unlock()
c := make(chan proto.LogEntry)
ev.RegisterListener(c)
var entry proto.LogEntry
for {
entry = <-c
if err := stream.Send(&entry); err != nil {
return err
}
}
return ev.forEachEvent(stream.Send)
}

func (s *server) Handle(ctx context.Context, event *proto.Event) (*empty.Empty, error) {
Expand Down

0 comments on commit b0f4b03

Please sign in to comment.