Browse Source

feature(worker): limit rsync memory using cgroup

bigeagle 9 years ago
parent
commit
28c8145137
5 changed files with 65 additions and 6 deletions
  1. 1 1
      .travis.yml
  2. 2 2
      systemd/tunasync-worker.service
  3. 25 3
      worker/cgroup.go
  4. 36 0
      worker/cgroup_test.go
  5. 1 0
      worker/worker.go

+ 1 - 1
.travis.yml

@@ -11,7 +11,7 @@ os:
     - linux
 
 before_script:
-    - sudo cgcreate -t travis -a travis -g cpu:tunasync
+    - sudo cgcreate -t travis -a travis -g memory:tunasync
 
 script:
     - ./.testandcover.bash

+ 2 - 2
systemd/tunasync-worker.service

@@ -6,10 +6,10 @@ After=network.target
 Type=simple
 User=tunasync
 PermissionsStartOnly=true
-ExecStartPre=/usr/bin/cgcreate -t tunasync -a tunasync -g cpu:tunasync
+ExecStartPre=/usr/bin/cgcreate -t tunasync -a tunasync -g memory:tunasync
 ExecStart=/home/bin/tunasync worker -c /etc/tunasync/worker.conf --with-systemd
 ExecReload=/bin/kill -SIGHUP $MAINPID
-ExecStopPost=/usr/bin/cgdelete cpu:tunasync
+ExecStopPost=/usr/bin/cgdelete memory:tunasync
 
 [Install]
 WantedBy=multi-user.target

+ 25 - 3
worker/cgroup.go

@@ -13,6 +13,8 @@ import (
 	"github.com/codeskyblue/go-sh"
 )
 
+var cgSubsystem string = "cpu"
+
 type cgroupHook struct {
 	emptyHook
 	provider  mirrorProvider
@@ -21,6 +23,14 @@ type cgroupHook struct {
 	created   bool
 }
 
+func initCgroup(basePath string) {
+	if _, err := os.Stat(filepath.Join(basePath, "memory")); err == nil {
+		cgSubsystem = "memory"
+		return
+	}
+	logger.Warning("Memory subsystem of cgroup not enabled, fallback to cpu")
+}
+
 func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook {
 	if basePath == "" {
 		basePath = "/sys/fs/cgroup"
@@ -37,7 +47,19 @@ func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook {
 
 func (c *cgroupHook) preExec() error {
 	c.created = true
-	return sh.Command("cgcreate", "-g", c.Cgroup()).Run()
+	if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil {
+		return err
+	}
+	if cgSubsystem != "memory" {
+		return nil
+	}
+	if c.provider.Type() == provRsync || c.provider.Type() == provTwoStageRsync {
+		gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name())
+		return sh.Command(
+			"cgset", "-r", "memory.limit_in_bytes=128M", gname,
+		).Run()
+	}
+	return nil
 }
 
 func (c *cgroupHook) postExec() error {
@@ -52,7 +74,7 @@ func (c *cgroupHook) postExec() error {
 
 func (c *cgroupHook) Cgroup() string {
 	name := c.provider.Name()
-	return fmt.Sprintf("cpu:%s/%s", c.baseGroup, name)
+	return fmt.Sprintf("%s:%s/%s", cgSubsystem, c.baseGroup, name)
 }
 
 func (c *cgroupHook) killAll() error {
@@ -60,7 +82,7 @@ func (c *cgroupHook) killAll() error {
 		return nil
 	}
 	name := c.provider.Name()
-	taskFile, err := os.Open(filepath.Join(c.basePath, "cpu", c.baseGroup, name, "tasks"))
+	taskFile, err := os.Open(filepath.Join(c.basePath, cgSubsystem, c.baseGroup, name, "tasks"))
 	if err != nil {
 		return err
 	}

+ 36 - 0
worker/cgroup_test.go

@@ -4,6 +4,7 @@ import (
 	"io/ioutil"
 	"os"
 	"path/filepath"
+	"strconv"
 	"strings"
 	"testing"
 	"time"
@@ -71,6 +72,7 @@ sleep 30
 		provider, err := newCmdProvider(c)
 		So(err, ShouldBeNil)
 
+		initCgroup("/sys/fs/cgroup")
 		cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
 		provider.AddHook(cg)
 
@@ -105,4 +107,38 @@ sleep 30
 		So(os.IsNotExist(err), ShouldBeTrue)
 
 	})
+
+	Convey("Rsync Memory Should Be Limited", t, func() {
+		tmpDir, err := ioutil.TempDir("", "tunasync")
+		defer os.RemoveAll(tmpDir)
+		So(err, ShouldBeNil)
+		scriptFile := filepath.Join(tmpDir, "myrsync")
+		tmpFile := filepath.Join(tmpDir, "log_file")
+
+		c := rsyncConfig{
+			name:        "tuna-cgroup",
+			upstreamURL: "rsync://rsync.tuna.moe/tuna/",
+			rsyncCmd:    scriptFile,
+			workingDir:  tmpDir,
+			logDir:      tmpDir,
+			logFile:     tmpFile,
+			useIPv6:     true,
+			interval:    600 * time.Second,
+		}
+
+		provider, err := newRsyncProvider(c)
+		So(err, ShouldBeNil)
+
+		initCgroup("/sys/fs/cgroup")
+		cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
+		provider.AddHook(cg)
+
+		cg.preExec()
+		if cgSubsystem == "memory" {
+			memoLimit, err := ioutil.ReadFile(filepath.Join(cg.basePath, "memory", cg.baseGroup, provider.Name(), "memory.limit_in_bytes"))
+			So(err, ShouldBeNil)
+			So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(128*1024*1024))
+		}
+		cg.postExec()
+	})
 }

+ 1 - 0
worker/worker.go

@@ -53,6 +53,7 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
 		w.httpClient = httpClient
 	}
 
+	initCgroup(cfg.Cgroup.BasePath)
 	w.initJobs()
 	w.makeHTTPServer()
 	tunasyncWorker = w