Browse Source

fix(worker): improved cgroup creation

bigeagle 8 năm trước cách đây
mục cha
commit
7601e5793f
6 tập tin đã thay đổi với 49 bổ sung41 xóa
  1. 2 2
      .travis.yml
  2. 20 22
      worker/cgroup.go
  3. 17 10
      worker/cgroup_test.go
  4. 6 3
      worker/config.go
  5. 4 1
      worker/provider.go
  6. 0 3
      worker/worker.go

+ 2 - 2
.travis.yml

@@ -17,8 +17,8 @@ services:
     - docker
 
 before_script:
-    - sudo mount -t memory -o memory memory /sys/fs/cgroup/memory
-    - mount
+    - lssubsys -am
+    - sudo cgcreate -a $USER -t $USER -g cpu:tunasync
     - sudo cgcreate -a $USER -t $USER -g memory:tunasync
     - docker pull alpine
 

+ 20 - 22
worker/cgroup.go

@@ -15,35 +15,31 @@ import (
 	"github.com/codeskyblue/go-sh"
 )
 
-var cgSubsystem = "cpuset"
-
 type cgroupHook struct {
 	emptyHook
 	provider  mirrorProvider
 	basePath  string
 	baseGroup string
 	created   bool
+	subsystem string
+	memLimit  string
 }
 
-func initCgroup(basePath string) {
-	if _, err := os.Stat(filepath.Join(basePath, "memory")); err == nil {
-		cgSubsystem = "memory"
-		return
-	}
-	logger.Warning("Memory subsystem of cgroup not enabled, fallback to cpu")
-}
-
-func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook {
+func newCgroupHook(p mirrorProvider, basePath, baseGroup, subsystem, memLimit string) *cgroupHook {
 	if basePath == "" {
 		basePath = "/sys/fs/cgroup"
 	}
 	if baseGroup == "" {
 		baseGroup = "tunasync"
 	}
+	if subsystem == "" {
+		subsystem = "cpu"
+	}
 	return &cgroupHook{
 		provider:  p,
 		basePath:  basePath,
 		baseGroup: baseGroup,
+		subsystem: subsystem,
 	}
 }
 
@@ -52,15 +48,17 @@ func (c *cgroupHook) preExec() error {
 	if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil {
 		return err
 	}
-	// if cgSubsystem != "memory" {
-	// 	return nil
-	// }
-	// if c.provider.Type() == provRsync || c.provider.Type() == provTwoStageRsync {
-	// 	gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name())
-	// 	return sh.Command(
-	// 		"cgset", "-r", "memory.limit_in_bytes=512M", gname,
-	// 	).Run()
-	// }
+	if c.subsystem != "memory" {
+		return nil
+	}
+	if c.memLimit != "" {
+		gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name())
+		return sh.Command(
+			"cgset", "-r",
+			fmt.Sprintf("memory.limit_in_bytes=%s", c.memLimit),
+			gname,
+		).Run()
+	}
 	return nil
 }
 
@@ -76,7 +74,7 @@ func (c *cgroupHook) postExec() error {
 
 func (c *cgroupHook) Cgroup() string {
 	name := c.provider.Name()
-	return fmt.Sprintf("%s:%s/%s", cgSubsystem, c.baseGroup, name)
+	return fmt.Sprintf("%s:%s/%s", c.subsystem, c.baseGroup, name)
 }
 
 func (c *cgroupHook) killAll() error {
@@ -87,7 +85,7 @@ func (c *cgroupHook) killAll() error {
 
 	readTaskList := func() ([]int, error) {
 		taskList := []int{}
-		taskFile, err := os.Open(filepath.Join(c.basePath, cgSubsystem, c.baseGroup, name, "tasks"))
+		taskFile, err := os.Open(filepath.Join(c.basePath, c.subsystem, c.baseGroup, name, "tasks"))
 		if err != nil {
 			return taskList, err
 		}

+ 17 - 10
worker/cgroup_test.go

@@ -4,6 +4,7 @@ import (
 	"io/ioutil"
 	"os"
 	"path/filepath"
+	"strconv"
 	"strings"
 	"testing"
 	"time"
@@ -71,11 +72,14 @@ sleep 30
 		provider, err := newCmdProvider(c)
 		So(err, ShouldBeNil)
 
-		initCgroup("/sys/fs/cgroup")
-		cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
+		cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync", "cpu", "")
 		provider.AddHook(cg)
 
 		err = cg.preExec()
+		if err != nil {
+			logger.Errorf("Failed to create cgroup")
+			return
+		}
 		So(err, ShouldBeNil)
 
 		go func() {
@@ -128,16 +132,19 @@ sleep 30
 		provider, err := newRsyncProvider(c)
 		So(err, ShouldBeNil)
 
-		initCgroup("/sys/fs/cgroup")
-		cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
+		cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync", "cpu", "512M")
 		provider.AddHook(cg)
 
-		cg.preExec()
-		//if cgSubsystem == "memory" {
-		//	memoLimit, err := ioutil.ReadFile(filepath.Join(cg.basePath, "memory", cg.baseGroup, provider.Name(), "memory.limit_in_bytes"))
-		//	So(err, ShouldBeNil)
-		//	So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024))
-		//}
+		err = cg.preExec()
+		if err != nil {
+			logger.Errorf("Failed to create cgroup")
+			return
+		}
+		if cg.subsystem == "memory" {
+			memoLimit, err := ioutil.ReadFile(filepath.Join(cg.basePath, "memory", cg.baseGroup, provider.Name(), "memory.limit_in_bytes"))
+			So(err, ShouldBeNil)
+			So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024))
+		}
 		cg.postExec()
 	})
 }

+ 6 - 3
worker/config.go

@@ -70,9 +70,10 @@ type serverConfig struct {
 }
 
 type cgroupConfig struct {
-	Enable   bool   `toml:"enable"`
-	BasePath string `toml:"base_path"`
-	Group    string `toml:"group"`
+	Enable    bool   `toml:"enable"`
+	BasePath  string `toml:"base_path"`
+	Group     string `toml:"group"`
+	Subsystem string `toml:"subsystem"`
 }
 
 type dockerConfig struct {
@@ -119,6 +120,8 @@ type mirrorConfig struct {
 	Password      string `toml:"password"`
 	Stage1Profile string `toml:"stage1_profile"`
 
+	MemoryLimit string `toml:"memory_limit"`
+
 	DockerImage   string   `toml:"docker_image"`
 	DockerVolumes []string `toml:"docker_volumes"`
 	DockerOptions []string `toml:"docker_options"`

+ 4 - 1
worker/provider.go

@@ -178,7 +178,10 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
 	} else if cfg.Cgroup.Enable {
 		// Add Cgroup Hook
 		provider.AddHook(
-			newCgroupHook(provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group),
+			newCgroupHook(
+				provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group,
+				cfg.Cgroup.Subsystem, mirror.MemoryLimit,
+			),
 		)
 	}
 

+ 0 - 3
worker/worker.go

@@ -55,9 +55,6 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
 		w.httpClient = httpClient
 	}
 
-	if cfg.Cgroup.Enable {
-		initCgroup(cfg.Cgroup.BasePath)
-	}
 	w.initJobs()
 	w.makeHTTPServer()
 	tunasyncWorker = w