Pārlūkot izejas kodu

[WIP] cgroupv2: support start with cgroupv2

Miao Wang 4 gadi atpakaļ
vecāks
revīzija
0f05c69c36
3 mainītis faili ar 111 papildinājumiem un 39 dzēšanām
  1. 30 19
      worker/cgroup.go
  2. 27 14
      worker/cgroup_test.go
  3. 54 6
      worker/runner.go

+ 30 - 19
worker/cgroup.go

@@ -3,6 +3,7 @@ package worker
 import (
 	"errors"
 	"fmt"
+	"io/ioutil"
 	"os"
 	"os/exec"
 	"path/filepath"
@@ -25,38 +26,47 @@ type cgroupHook struct {
 	cgMgrV2   *cgv2.Manager
 }
 
+type execCmd string
+
+const (
+	cmdCont     execCmd = "cont"
+	cmdAbrt     execCmd = "abrt"
+)
+
 func init () {
 	reexec.Register("tunasync-exec", waitExec)
 }
 
 func waitExec () {
-	binary, lookErr := exec.LookPath(os.Args[1])
-	if lookErr != nil {
-		panic(lookErr)
+	binary, err := exec.LookPath(os.Args[1])
+	if err != nil {
+		panic(err)
 	}
 
 	pipe := os.NewFile(3, "pipe")
 	if pipe != nil {
-		for {
-			tmpBytes := make([]byte, 1)
-			nRead, err := pipe.Read(tmpBytes)
+		if _, err := pipe.Stat(); err == nil {
+			cmdBytes, err := ioutil.ReadAll(pipe)
 			if err != nil {
-				break
+				panic(err)
 			}
-			if nRead == 0 {
-				break
+			if err := pipe.Close(); err != nil {
+			}
+			cmd := execCmd(string(cmdBytes))
+			switch cmd {
+				case cmdAbrt:
+					fallthrough
+				default:
+					panic("Exited on request")
+				case cmdCont:
 			}
-		}
-		err := pipe.Close()
-		if err != nil {
 		}
 	}
 
 	args := os.Args[1:]
 	env := os.Environ()
-	execErr := syscall.Exec(binary, args, env)
-	if execErr != nil {
-		panic(execErr)
+	if err := syscall.Exec(binary, args, env); err != nil {
+		panic(err)
 	}
 	panic("Exec failed.")
 }
@@ -241,6 +251,7 @@ func newCgroupHook(p mirrorProvider, cfg cgroupConfig, memLimit MemBytes) *cgrou
 			provider: p,
 		},
 		cgCfg: cfg,
+		memLimit: memLimit,
 	}
 }
 
@@ -255,7 +266,7 @@ func (c *cgroupHook) preExec() error {
 				},
 			}
 		}
-		subMgr, err := c.cgMgrV2.NewChild(c.provider.Name(), resSet)
+		subMgr, err := c.cgCfg.cgMgrV2.NewChild(c.provider.Name(), resSet)
 		if err != nil {
 			logger.Errorf("Failed to create cgroup for task %s: %s", c.provider.Name(), err.Error())
 			return err
@@ -263,15 +274,15 @@ func (c *cgroupHook) preExec() error {
 		c.cgMgrV2 = subMgr
 	} else {
 		logger.Debugf("Creating v1 cgroup for task %s", c.provider.Name())
-		var resSet *contspecs.LinuxResources
+		var resSet contspecs.LinuxResources
 		if c.memLimit != 0 {
-			resSet = &contspecs.LinuxResources {
+			resSet = contspecs.LinuxResources {
 				Memory: &contspecs.LinuxMemory{
 					Limit: func(i int64) *int64 { return &i }(c.memLimit.Value()),
 				},
 			}
 		}
-		subMgr, err := c.cgMgrV1.New(c.provider.Name(), resSet)
+		subMgr, err := c.cgCfg.cgMgrV1.New(c.provider.Name(), &resSet)
 		if err != nil {
 			logger.Errorf("Failed to create cgroup for task %s: %s", c.provider.Name(), err.Error())
 			return err

+ 27 - 14
worker/cgroup_test.go

@@ -10,10 +10,15 @@ import (
 	"time"
 	cgv1 "github.com/containerd/cgroups"
 	units "github.com/docker/go-units"
+	"github.com/moby/moby/pkg/reexec"
 
 	. "github.com/smartystreets/goconvey/convey"
 )
 
+func init() {
+	reexec.Init()
+}
+
 func TestCgroup(t *testing.T) {
 	Convey("Cgroup Should Work", t, func(ctx C) {
 		tmpDir, err := ioutil.TempDir("", "tunasync")
@@ -77,14 +82,15 @@ sleep 30
 		cgcf := cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"}
 		err = initCgroup(&cgcf)
 		So(err, ShouldBeNil)
+		if cgcf.isUnified {
+			So(cgcf.cgMgrV2, ShouldNotBeNil)
+		} else {
+			So(cgcf.cgMgrV1, ShouldNotBeNil)
+		}
 		cg := newCgroupHook(provider, cgcf, 0)
 		provider.AddHook(cg)
 
 		err = cg.preExec()
-		if err != nil {
-			logger.Errorf("Failed to create cgroup")
-			return
-		}
 		So(err, ShouldBeNil)
 
 		go func() {
@@ -140,20 +146,27 @@ sleep 30
 		cgcf := cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"}
 		err = initCgroup(&cgcf)
 		So(err, ShouldBeNil)
+		if cgcf.isUnified {
+			So(cgcf.cgMgrV2, ShouldNotBeNil)
+		} else {
+			So(cgcf.cgMgrV1, ShouldNotBeNil)
+		}
 		cg := newCgroupHook(provider, cgcf, 512 * units.MiB)
 		provider.AddHook(cg)
 
 		err = cg.preExec()
-		if err != nil {
-			logger.Errorf("Failed to create cgroup")
-			return
-		}
-		So(cg.cgMgrV1, ShouldNotBeNil)
-		for _, subsys := range(cg.cgMgrV1.Subsystems()) {
-			if subsys.Name() == cgv1.Memory {
-				memoLimit, err := ioutil.ReadFile(filepath.Join(cgcf.BasePath, "memory", cgcf.Group, provider.Name(), "memory.limit_in_bytes"))
-				So(err, ShouldBeNil)
-				So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024))
+		So(err, ShouldBeNil)
+		if cgcf.isUnified {
+			memoLimit, err := ioutil.ReadFile(filepath.Join(cgcf.BasePath, cgcf.Group, provider.Name(), "memory.max"))
+			So(err, ShouldBeNil)
+			So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024))
+		} else {
+			for _, subsys := range(cg.cgMgrV1.Subsystems()) {
+				if subsys.Name() == cgv1.Memory {
+					memoLimit, err := ioutil.ReadFile(filepath.Join(cgcf.BasePath, "memory", cgcf.Group, provider.Name(), "memory.limit_in_bytes"))
+					So(err, ShouldBeNil)
+					So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024))
+				}
 			}
 		}
 		cg.postExec()

+ 54 - 6
worker/runner.go

@@ -12,6 +12,8 @@ import (
 
 	"github.com/codeskyblue/go-sh"
 	"golang.org/x/sys/unix"
+	"github.com/moby/moby/pkg/reexec"
+	cgv1 "github.com/containerd/cgroups"
 )
 
 // runner is to run os commands giving command line, env and log file
@@ -70,11 +72,7 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
 		cmd = exec.Command(c, args...)
 
 	} else if provider.Cgroup() != nil {
-		//c := "cgexec"
-		//args := []string{"-g", provider.Cgroup().Cgroup()}
-		//args = append(args, cmdAndArgs...)
-		//cmd = exec.Command(c, args...)
-		cmd = exec.Command(cmdAndArgs[0], cmdAndArgs[1:]...)
+		cmd = reexec.Command(append([]string{"tunasync-exec"}, cmdAndArgs...)...)
 
 	} else {
 		if len(cmdAndArgs) == 1 {
@@ -109,9 +107,59 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
 }
 
 func (c *cmdJob) Start() error {
+	cg := c.provider.Cgroup()
+	var (
+		pipeR *os.File
+		pipeW *os.File
+	)
+	if cg != nil {
+		logger.Debugf("Preparing cgroup sync pipes for job %s", c.provider.Name())
+		var err error
+		pipeR, pipeW, err = os.Pipe();
+		if err != nil {
+			return err
+		}
+		c.cmd.ExtraFiles = []*os.File{pipeR}
+		defer pipeR.Close()
+		defer pipeW.Close()
+	}
+
 	logger.Debugf("Command start: %v", c.cmd.Args)
 	c.finished = make(chan empty, 1)
-	return c.cmd.Start()
+
+	if err := c.cmd.Start(); err != nil {
+		return err
+	}
+	if cg != nil {
+		if err := pipeR.Close(); err != nil {
+			return err
+		}
+		if c.cmd == nil || c.cmd.Process == nil {
+			return errProcessNotStarted
+		}
+		pid := c.cmd.Process.Pid
+		if cg.cgCfg.isUnified {
+			if err := cg.cgMgrV2.AddProc(uint64(pid)); err != nil{
+				if errors.Is(err, syscall.ESRCH) {
+					logger.Infof("Write pid %d to cgroup failed: process vanished, ignoring")
+				} else {
+					return err
+				}
+			}
+		} else {
+			if err := cg.cgMgrV1.Add(cgv1.Process{Pid: pid}); err != nil{
+				if errors.Is(err, syscall.ESRCH) {
+					logger.Infof("Write pid %d to cgroup failed: process vanished, ignoring")
+				} else {
+					return err
+				}
+			}
+		}
+		if _, err := pipeW.WriteString(string(cmdCont)); err != nil {
+			return err
+		}
+	}
+	return nil
 }
 
 func (c *cmdJob) Wait() error {