Skip to content

Commit

Permalink
simplify done channel handling, fix other pr comments.
Browse files Browse the repository at this point in the history
Signed-off-by: Cory Bennett <cbennett@netflix.com>
  • Loading branch information
coryb committed Jul 11, 2020
1 parent 5e91dff commit 36636d3
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 82 deletions.
64 changes: 29 additions & 35 deletions executor/containerdexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type containerdExecutor struct {
cgroupParent string
dnsConfig *oci.DNSConfig
running map[string]chan error
mu *sync.Mutex
mu sync.Mutex
}

// New creates a new executor backed by connection to containerd API
Expand All @@ -42,18 +42,17 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb
os.RemoveAll(filepath.Join(root, "hosts"))
os.RemoveAll(filepath.Join(root, "resolv.conf"))

return containerdExecutor{
return &containerdExecutor{
client: client,
root: root,
networkProviders: networkProviders,
cgroupParent: cgroup,
dnsConfig: dnsConfig,
running: make(map[string]chan error),
mu: &sync.Mutex{},
}
}

func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) {
func (w *containerdExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) {
if id == "" {
id = identity.NewID()
}
Expand All @@ -67,6 +66,7 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
w.mu.Lock()
delete(w.running, id)
w.mu.Unlock()
done <- err
close(done)
if started != nil {
startedOnce.Do(func() {
Expand All @@ -79,25 +79,25 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount

resolvConf, err := oci.GetResolvConf(ctx, w.root, nil, w.dnsConfig)
if err != nil {
return sendErr(done, err)
return err
}

hostsFile, clean, err := oci.GetHostsFile(ctx, w.root, meta.ExtraHosts, nil)
if err != nil {
return sendErr(done, err)
return err
}
if clean != nil {
defer clean()
}

mountable, err := root.Mount(ctx, false)
if err != nil {
return sendErr(done, err)
return err
}

rootMounts, release, err := mountable.Mount()
if err != nil {
return sendErr(done, err)
return err
}
if release != nil {
defer release()
Expand All @@ -109,12 +109,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
lm := snapshot.LocalMounterWithMounts(rootMounts)
rootfsPath, err := lm.Mount()
if err != nil {
return sendErr(done, err)
return err
}
uid, gid, sgids, err = oci.GetUser(ctx, rootfsPath, meta.User)
if err != nil {
lm.Unmount()
return sendErr(done, err)
return err
}

identity := idtools.Identity{
Expand All @@ -125,12 +125,12 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
newp, err := fs.RootPath(rootfsPath, meta.Cwd)
if err != nil {
lm.Unmount()
return sendErr(done, errors.Wrapf(err, "working dir %s points to invalid target", newp))
return errors.Wrapf(err, "working dir %s points to invalid target", newp)
}
if _, err := os.Stat(newp); err != nil {
if err := idtools.MkdirAllAndChown(newp, 0755, identity); err != nil {
lm.Unmount()
return sendErr(done, errors.Wrapf(err, "failed to create working directory %s", newp))
return errors.Wrapf(err, "failed to create working directory %s", newp)
}
}

Expand All @@ -139,11 +139,11 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount

provider, ok := w.networkProviders[meta.NetMode]
if !ok {
return sendErr(done, errors.Errorf("unknown network mode %s", meta.NetMode))
return errors.Errorf("unknown network mode %s", meta.NetMode)
}
namespace, err := provider.New()
if err != nil {
return sendErr(done, err)
return err
}
defer namespace.Close()

Expand All @@ -169,21 +169,20 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
processMode := oci.ProcessSandbox // FIXME(AkihiroSuda)
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, processMode, nil, opts...)
if err != nil {
return sendErr(done, err)
return err
}
defer cleanup()

container, err := w.client.NewContainer(ctx, id,
containerd.WithSpec(spec),
)
if err != nil {
return sendErr(done, err)
return err
}

defer func() {
if err1 := container.Delete(context.TODO()); err == nil && err1 != nil {
err = errors.Wrapf(err1, "failed to delete container %s", id)
sendErr(done, err)
}
}()

Expand All @@ -194,17 +193,16 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount

task, err := container.NewTask(ctx, cio.NewCreator(cioOpts...), containerd.WithRootFS(rootMounts))
if err != nil {
return sendErr(done, err)
return err
}
defer func() {
if _, err1 := task.Delete(context.TODO()); err == nil && err1 != nil {
err = errors.Wrapf(err1, "failed to delete task %s", id)
sendErr(done, err)
}
}()

if err := task.Start(ctx); err != nil {
return sendErr(done, err)
return err
}

if started != nil {
Expand All @@ -214,7 +212,7 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
}
statusCh, err := task.Wait(context.Background())
if err != nil {
return sendErr(done, err)
return err
}

var cancel func()
Expand Down Expand Up @@ -242,30 +240,31 @@ func (w containerdExecutor) Run(ctx context.Context, id string, root cache.Mount
err = errors.Wrap(ctx.Err(), err.Error())
default:
}
return sendErr(done, err)
return err
}
return nil
}
}
}

func (w containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error {
func (w *containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error {
meta := process.Meta

// first verify the container is running, if we get an error assume the container
// is in the process of being created and check again every 100ms or until
// context is canceled.

w.mu.Lock()
done, ok := w.running[id]
w.mu.Unlock()
if !ok {
return errors.Errorf("container %s not found", id)
}

var container containerd.Container
var task containerd.Task
for {
w.mu.Lock()
done, ok := w.running[id]
w.mu.Unlock()

if !ok {
return errors.Errorf("container %s not found", id)
}

if container == nil {
container, _ = w.client.LoadContainer(ctx, id)
}
Expand Down Expand Up @@ -332,8 +331,3 @@ func (w containerdExecutor) Exec(ctx context.Context, id string, process executo
}
return taskProcess.Start(ctx)
}

func sendErr(c chan error, err error) error {
c <- err
return err
}
Loading

0 comments on commit 36636d3

Please sign in to comment.