浏览代码

[WIP] cgroupv2: add cgroup add and del

Miao Wang 4 年之前
父节点
当前提交
2c4d2d6ae0
共有 3 个文件被更改,包括 88 次插入66 次删除
  1. 1 1
      go.mod
  2. 82 61
      worker/cgroup.go
  3. 5 4
      worker/runner.go

+ 1 - 1
go.mod

@@ -21,7 +21,7 @@ require (
 	github.com/gomodule/redigo v1.8.2 // indirect
 	github.com/gomodule/redigo v1.8.2 // indirect
 	github.com/imdario/mergo v0.3.9
 	github.com/imdario/mergo v0.3.9
 	github.com/moby/moby v20.10.7+incompatible
 	github.com/moby/moby v20.10.7+incompatible
-	github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417 // indirect
+	github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417
 	github.com/pkg/errors v0.9.1
 	github.com/pkg/errors v0.9.1
 	github.com/pkg/profile v1.4.0
 	github.com/pkg/profile v1.4.0
 	github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46
 	github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46

+ 82 - 61
worker/cgroup.go

@@ -1,19 +1,16 @@
 package worker
 package worker
 
 
 import (
 import (
-	"bufio"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
 	"os"
 	"os"
 	"os/exec"
 	"os/exec"
 	"path/filepath"
 	"path/filepath"
-	"strconv"
 	"syscall"
 	"syscall"
 	"time"
 	"time"
 
 
 	"golang.org/x/sys/unix"
 	"golang.org/x/sys/unix"
 
 
-	"github.com/codeskyblue/go-sh"
 	"github.com/moby/moby/pkg/reexec"
 	"github.com/moby/moby/pkg/reexec"
 	cgv1 "github.com/containerd/cgroups"
 	cgv1 "github.com/containerd/cgroups"
 	cgv2 "github.com/containerd/cgroups/v2"
 	cgv2 "github.com/containerd/cgroups/v2"
@@ -22,11 +19,10 @@ import (
 
 
 type cgroupHook struct {
 type cgroupHook struct {
 	emptyHook
 	emptyHook
-	basePath	string
-	baseGroup string
-	created	 bool
-	subsystem string
-	memLimit	MemBytes
+	cgCfg     cgroupConfig
+	memLimit  MemBytes
+	cgMgrV1   cgv1.Cgroup
+	cgMgrV2   *cgv2.Manager
 }
 }
 
 
 func init () {
 func init () {
@@ -240,45 +236,47 @@ func initCgroup(cfg *cgroupConfig) (error) {
 }
 }
 
 
 func newCgroupHook(p mirrorProvider, cfg cgroupConfig, memLimit MemBytes) *cgroupHook {
 func newCgroupHook(p mirrorProvider, cfg cgroupConfig, memLimit MemBytes) *cgroupHook {
-	var (
-		basePath = cfg.BasePath
-		baseGroup = cfg.Group
-		subsystem = cfg.Subsystem
-	)
-	if basePath == "" {
-		basePath = "/sys/fs/cgroup"
-	}
-	if baseGroup == "" {
-		baseGroup = "tunasync"
-	}
-	if subsystem == "" {
-		subsystem = "cpu"
-	}
 	return &cgroupHook{
 	return &cgroupHook{
 		emptyHook: emptyHook{
 		emptyHook: emptyHook{
 			provider: p,
 			provider: p,
 		},
 		},
-		basePath:  basePath,
-		baseGroup: baseGroup,
-		subsystem: subsystem,
+		cgCfg: cfg,
 	}
 	}
 }
 }
 
 
 func (c *cgroupHook) preExec() error {
 func (c *cgroupHook) preExec() error {
-	c.created = true
-	if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil {
-		return err
-	}
-	if c.subsystem != "memory" {
-		return nil
-	}
-	if c.memLimit != 0 {
-		gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name())
-		return sh.Command(
-			"cgset", "-r",
-			fmt.Sprintf("memory.limit_in_bytes=%d", c.memLimit.Value()),
-			gname,
-		).Run()
+	if c.cgCfg.isUnified {
+		logger.Debugf("Creating v2 cgroup for task %s", c.provider.Name())
+		var resSet *cgv2.Resources
+		if c.memLimit != 0 {
+			resSet = &cgv2.Resources {
+				Memory: &cgv2.Memory{
+					Max: func(i int64) *int64 { return &i }(c.memLimit.Value()),
+				},
+			}
+		}
+		subMgr, err := c.cgMgrV2.NewChild(c.provider.Name(), resSet)
+		if err != nil {
+			logger.Errorf("Failed to create cgroup for task %s: %s", c.provider.Name(), err.Error())
+			return err
+		}
+		c.cgMgrV2 = subMgr
+	} else {
+		logger.Debugf("Creating v1 cgroup for task %s", c.provider.Name())
+		var resSet *contspecs.LinuxResources
+		if c.memLimit != 0 {
+			resSet = &contspecs.LinuxResources {
+				Memory: &contspecs.LinuxMemory{
+					Limit: func(i int64) *int64 { return &i }(c.memLimit.Value()),
+				},
+			}
+		}
+		subMgr, err := c.cgMgrV1.New(c.provider.Name(), resSet)
+		if err != nil {
+			logger.Errorf("Failed to create cgroup for task %s: %s", c.provider.Name(), err.Error())
+			return err
+		}
+		c.cgMgrV1 = subMgr
 	}
 	}
 	return nil
 	return nil
 }
 }
@@ -289,36 +287,59 @@ func (c *cgroupHook) postExec() error {
 		logger.Errorf("Error killing tasks: %s", err.Error())
 		logger.Errorf("Error killing tasks: %s", err.Error())
 	}
 	}
 
 
-	c.created = false
-	return sh.Command("cgdelete", c.Cgroup()).Run()
-}
-
-func (c *cgroupHook) Cgroup() string {
-	name := c.provider.Name()
-	return fmt.Sprintf("%s:%s/%s", c.subsystem, c.baseGroup, name)
+	if c.cgCfg.isUnified {
+		logger.Debugf("Deleting v2 cgroup for task %s", c.provider.Name())
+		if err := c.cgMgrV2.Delete(); err != nil {
+			logger.Errorf("Failed to delete cgroup for task %s: %s", c.provider.Name(), err.Error())
+			return err
+		}
+		c.cgMgrV2 = nil
+	} else {
+		logger.Debugf("Deleting v1 cgroup for task %s", c.provider.Name())
+		if err := c.cgMgrV1.Delete(); err != nil {
+			logger.Errorf("Failed to delete cgroup for task %s: %s", c.provider.Name(), err.Error())
+			return err
+		}
+		c.cgMgrV1 = nil
+	}
+	return nil
 }
 }
 
 
 func (c *cgroupHook) killAll() error {
 func (c *cgroupHook) killAll() error {
-	if !c.created {
-		return nil
+	if c.cgCfg.isUnified {
+		if c.cgMgrV2 == nil {
+			return nil
+		}
+	} else {
+		if c.cgMgrV1 == nil {
+			return nil
+		}
 	}
 	}
-	name := c.provider.Name()
 
 
 	readTaskList := func() ([]int, error) {
 	readTaskList := func() ([]int, error) {
 		taskList := []int{}
 		taskList := []int{}
-		taskFile, err := os.Open(filepath.Join(c.basePath, c.subsystem, c.baseGroup, name, "tasks"))
-		if err != nil {
-			return taskList, err
-		}
-		defer taskFile.Close()
-
-		scanner := bufio.NewScanner(taskFile)
-		for scanner.Scan() {
-			pid, err := strconv.Atoi(scanner.Text())
-			if err != nil {
-				return taskList, err
+		if c.cgCfg.isUnified {
+			procs, err := c.cgMgrV2.Procs(false)
+			if (err != nil) {
+				return []int{}, err
+			}
+			for _, proc := range procs {
+				taskList = append(taskList, int(proc))
+			}
+		} else {
+			taskSet := make(map[int]struct{})
+			for _, subsys := range(c.cgMgrV1.Subsystems()) {
+				procs, err := c.cgMgrV1.Processes(subsys.Name(), false)
+				if err != nil {
+					return []int{}, err
+				}
+				for _, proc := range(procs) {
+					taskSet[proc.Pid] = struct{}{}
+				}
+			}
+			for proc := range(taskSet) {
+				taskList = append(taskList, proc)
 			}
 			}
-			taskList = append(taskList, pid)
 		}
 		}
 		return taskList, nil
 		return taskList, nil
 	}
 	}

+ 5 - 4
worker/runner.go

@@ -70,10 +70,11 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
 		cmd = exec.Command(c, args...)
 		cmd = exec.Command(c, args...)
 
 
 	} else if provider.Cgroup() != nil {
 	} else if provider.Cgroup() != nil {
-		c := "cgexec"
-		args := []string{"-g", provider.Cgroup().Cgroup()}
-		args = append(args, cmdAndArgs...)
-		cmd = exec.Command(c, args...)
+		//c := "cgexec"
+		//args := []string{"-g", provider.Cgroup().Cgroup()}
+		//args = append(args, cmdAndArgs...)
+		//cmd = exec.Command(c, args...)
+		cmd = exec.Command(cmdAndArgs[0], cmdAndArgs[1:]...)
 
 
 	} else {
 	} else {
 		if len(cmdAndArgs) == 1 {
 		if len(cmdAndArgs) == 1 {