Browse Source

[WIP] cgroupv2: add init cgroup

Miao Wang 4 years ago
parent
commit
02a144744f
3 changed files with 149 additions and 4 deletions
  1. 139 4
      worker/cgroup.go
  2. 4 0
      worker/cgroup_test.go
  3. 6 0
      worker/worker.go

+ 139 - 4
worker/cgroup.go

@@ -17,6 +17,7 @@ import (
 	"github.com/moby/moby/pkg/reexec"
 	cgv1 "github.com/containerd/cgroups"
 	cgv2 "github.com/containerd/cgroups/v2"
+	contspecs "github.com/opencontainers/runtime-spec/specs-go"
 )
 
 type cgroupHook struct {
@@ -66,6 +67,7 @@ func waitExec () {
 
 func initCgroup(cfg *cgroupConfig) (error) {
 
+	logger.Debugf("Initializing cgroup")
 	baseGroup := cfg.Group
 	//subsystem := cfg.Subsystem
 
@@ -78,27 +80,160 @@ func initCgroup(cfg *cgroupConfig) (error) {
 	cfg.isUnified = cgv1.Mode() == cgv1.Unified
 
 	if cfg.isUnified {
-		var err error
+		logger.Debugf("Cgroup V2 detected")
 		g := baseGroup
 		if g == "" {
+			logger.Debugf("Detecting my cgroup path")
+			var err error
 			if g, err = cgv2.NestedGroupPath(""); err != nil {
 				return err
 			}
 		}
+		logger.Infof("Using cgroup path: %s", g)
+
+		var err error
 		if cfg.cgMgrV2, err = cgv2.LoadManager("/sys/fs/cgroup", g); err != nil {
 			return err
 		}
+		if baseGroup == "" {
+			logger.Debugf("Creating a sub group and move all processes into it")
+			wkrMgr, err := cfg.cgMgrV2.NewChild("__worker", nil);
+			if err != nil {
+				return err
+			}
+			for {
+				logger.Debugf("Reading pids")
+				procs, err := cfg.cgMgrV2.Procs(false)
+				if err != nil {
+					logger.Errorf("Cannot read pids in that group")
+					return err
+				}
+				if len(procs) == 0 {
+					break
+				}
+				for _, p := range(procs) {
+					if err := wkrMgr.AddProc(p); err != nil{
+						if errors.Is(err, syscall.ESRCH) {
+							logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
+						} else {
+							return err
+						}
+					}
+				}
+			}
+		} else {
+			logger.Debugf("Trying to create a sub group in that group")
+			testMgr, err := cfg.cgMgrV2.NewChild("__test", nil);
+			if err != nil {
+				logger.Errorf("Cannot create a sub group in the cgroup")
+				return err
+			}
+			if err := testMgr.Delete(); err != nil {
+				return err
+			}
+			procs, err := cfg.cgMgrV2.Procs(false)
+			if err != nil {
+				logger.Errorf("Cannot read pids in that group")
+				return err
+			}
+			if len(procs) != 0 {
+				return fmt.Errorf("There are remaining processes in cgroup %s", baseGroup)
+			}
+		}
 	} else {
-		var err error
+		logger.Debugf("Cgroup V1 detected")
 		var pather cgv1.Path
 		if baseGroup != "" {
 			pather = cgv1.StaticPath(baseGroup)
 		} else {
-			pather = cgv1.NestedPath("")
+			pather = (func(p cgv1.Path) (cgv1.Path){
+				return func(subsys cgv1.Name) (string, error){
+					path, err := p(subsys);
+					if err != nil {
+						return "", err
+					}
+					if path == "/" {
+						return "", cgv1.ErrControllerNotActive
+					}
+					return path, err
+				}
+			})(cgv1.NestedPath(""))
 		}
+		logger.Infof("Loading cgroup")
+		var err error
 		if cfg.cgMgrV1, err = cgv1.Load(cgv1.V1, pather); err != nil {
 			return err
 		}
+		logger.Debugf("Available subsystems:")
+		for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
+			p, err := pather(subsys.Name())
+			if err != nil {
+				return err
+			}
+			logger.Debugf("%s: %s", subsys.Name(), p)
+		}
+		if baseGroup == "" {
+			logger.Debugf("Creating a sub group and move all processes into it")
+			wkrMgr, err := cfg.cgMgrV1.New("__worker", &contspecs.LinuxResources{});
+			if err != nil {
+				return err
+			}
+			for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
+				logger.Debugf("Reading pids for subsystem %s", subsys.Name())
+				for {
+					procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false)
+					if err != nil {
+						p, err := pather(subsys.Name())
+						if err != nil {
+							return err
+						}
+						logger.Errorf("Cannot read pids in group %s of subsystem %s", p, subsys.Name())
+						return err
+					}
+					if len(procs) == 0 {
+						break
+					}
+					for _, proc := range(procs) {
+						if err := wkrMgr.Add(proc); err != nil {
+							if errors.Is(err, syscall.ESRCH) {
+								logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
+							} else {
+								return err
+							}
+						}
+					}
+				}
+			}
+		} else {
+			logger.Debugf("Trying to create a sub group in that group")
+			testMgr, err := cfg.cgMgrV1.New("__test", &contspecs.LinuxResources{});
+			if err != nil {
+				logger.Errorf("Cannot create a sub group in the cgroup")
+				return err
+			}
+			if err := testMgr.Delete(); err != nil {
+				return err
+			}
+			for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
+				logger.Debugf("Reading pids for subsystem %s", subsys.Name())
+				procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false)
+				if err != nil {
+					p, err := pather(subsys.Name())
+					if err != nil {
+						return err
+					}
+					logger.Errorf("Cannot read pids in group %s of subsystem %s", p, subsys.Name())
+					return err
+				}
+				if len(procs) != 0 {
+					p, err := pather(subsys.Name())
+					if err != nil {
+						return err
+					}
+					return fmt.Errorf("There are remaining processes in cgroup %s of subsystem %s", p, subsys.Name())
+				}
+			}
+		}
 	}
 
 	return nil
@@ -123,7 +258,7 @@ func newCgroupHook(p mirrorProvider, cfg cgroupConfig, memLimit MemBytes) *cgrou
 		emptyHook: emptyHook{
 			provider: p,
 		},
-		basePath:	basePath,
+		basePath:  basePath,
 		baseGroup: baseGroup,
 		subsystem: subsystem,
 	}

+ 4 - 0
worker/cgroup_test.go

@@ -74,6 +74,8 @@ sleep 30
 		So(err, ShouldBeNil)
 
 		cgcf := cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"}
+		err = initCgroup(&cgcf)
+		So(err, ShouldBeNil)
 		cg := newCgroupHook(provider, cgcf, 0)
 		provider.AddHook(cg)
 
@@ -135,6 +137,8 @@ sleep 30
 		So(err, ShouldBeNil)
 
 		cgcf := cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"}
+		err = initCgroup(&cgcf)
+		So(err, ShouldBeNil)
 		cg := newCgroupHook(provider, cgcf, 512 * units.MiB)
 		provider.AddHook(cg)
 

+ 6 - 0
worker/worker.go

@@ -54,6 +54,12 @@ func NewTUNASyncWorker(cfg *Config) *Worker {
 		w.httpClient = httpClient
 	}
 
+	if cfg.Cgroup.Enable {
+		if err := initCgroup(&cfg.Cgroup); err != nil {
+			logger.Errorf("Error initializing Cgroup: %s", err.Error())
+			return nil
+		}
+	}
 	w.initJobs()
 	w.makeHTTPServer()
 	return w