Bladeren bron

feature(worker): use cgroup track job process, so that they can be all-killed

bigeagle 9 jaren geleden
bovenliggende
commit
924fda6dd8
11 gewijzigde bestanden met toevoegingen van 250 en 17 verwijderingen
  1. 6 2
      .travis.yml
  2. 5 0
      tests/worker.conf
  3. 83 0
      worker/cgroup.go
  4. 108 0
      worker/cgroup_test.go
  5. 1 1
      worker/cmd_provider.go
  6. 7 0
      worker/config.go
  7. 11 1
      worker/provider.go
  8. 1 1
      worker/rsync_provider.go
  9. 17 9
      worker/runner.go
  10. 1 1
      worker/two_stage_rsync_provider.go
  11. 10 2
      worker/worker.go

+ 6 - 2
.travis.yml

@@ -3,12 +3,16 @@ go:
     - 1.6
 
 before_install:
-  - go get golang.org/x/tools/cmd/cover
-  - go get -v github.com/mattn/goveralls
+    - sudo apt-get install cgroup-bin
+    - go get golang.org/x/tools/cmd/cover
+    - go get -v github.com/mattn/goveralls
 
 os:
     - linux
 
+before_script:
+    - sudo cgcreate -t travis -a travis -g cpu:tunasync
+
 script:
     - ./.testandcover.bash
 

+ 5 - 0
tests/worker.conf

@@ -10,6 +10,11 @@ api_base = "https://localhost:12345"
 token = "some_token"
 ca_cert = "rootCA.crt"
 
+[cgroup]
+enable = true
+base_path = "/sys/fs/cgroup"
+group = "tunasync"
+
 [server]
 hostname = "localhost"
 listen_addr = "127.0.0.1"

+ 83 - 0
worker/cgroup.go

@@ -0,0 +1,83 @@
+package worker
+
+import (
+	"bufio"
+	"fmt"
+	"os"
+	"path/filepath"
+	"strconv"
+	"syscall"
+
+	"golang.org/x/sys/unix"
+
+	"github.com/codeskyblue/go-sh"
+)
+
+type cgroupHook struct {
+	emptyHook
+	provider  mirrorProvider
+	basePath  string
+	baseGroup string
+	created   bool
+}
+
+func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook {
+	if basePath == "" {
+		basePath = "/sys/fs/cgroup"
+	}
+	if baseGroup == "" {
+		baseGroup = "tunasync"
+	}
+	return &cgroupHook{
+		provider:  p,
+		basePath:  basePath,
+		baseGroup: baseGroup,
+	}
+}
+
+func (c *cgroupHook) preExec() error {
+	c.created = true
+	return sh.Command("cgcreate", "-g", c.Cgroup()).Run()
+}
+
+func (c *cgroupHook) postExec() error {
+	err := c.killAll()
+	if err != nil {
+		logger.Error("Error killing tasks: %s", err.Error())
+	}
+
+	c.created = false
+	return sh.Command("cgdelete", c.Cgroup()).Run()
+}
+
+func (c *cgroupHook) Cgroup() string {
+	name := c.provider.Name()
+	return fmt.Sprintf("cpu:%s/%s", c.baseGroup, name)
+}
+
+func (c *cgroupHook) killAll() error {
+	if !c.created {
+		return nil
+	}
+	name := c.provider.Name()
+	taskFile, err := os.Open(filepath.Join(c.basePath, "cpu", c.baseGroup, name, "tasks"))
+	if err != nil {
+		return err
+	}
+	defer taskFile.Close()
+	taskList := []int{}
+	scanner := bufio.NewScanner(taskFile)
+	for scanner.Scan() {
+		pid, err := strconv.Atoi(scanner.Text())
+		if err != nil {
+			return err
+		}
+		taskList = append(taskList, pid)
+	}
+	for _, pid := range taskList {
+		logger.Debug("Killing process: %d", pid)
+		unix.Kill(pid, syscall.SIGKILL)
+	}
+
+	return nil
+}

+ 108 - 0
worker/cgroup_test.go

@@ -0,0 +1,108 @@
+package worker
+
+import (
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"strings"
+	"testing"
+	"time"
+
+	. "github.com/smartystreets/goconvey/convey"
+)
+
+func TestCgroup(t *testing.T) {
+	Convey("Cgroup Should Work", t, func(ctx C) {
+		tmpDir, err := ioutil.TempDir("", "tunasync")
+		defer os.RemoveAll(tmpDir)
+		So(err, ShouldBeNil)
+		cmdScript := filepath.Join(tmpDir, "cmd.sh")
+		daemonScript := filepath.Join(tmpDir, "daemon.sh")
+		tmpFile := filepath.Join(tmpDir, "log_file")
+		bgPidfile := filepath.Join(tmpDir, "bg.pid")
+
+		c := cmdConfig{
+			name:        "tuna-cgroup",
+			upstreamURL: "http://mirrors.tuna.moe/",
+			command:     cmdScript + " " + daemonScript,
+			workingDir:  tmpDir,
+			logDir:      tmpDir,
+			logFile:     tmpFile,
+			interval:    600 * time.Second,
+			env: map[string]string{
+				"BG_PIDFILE": bgPidfile,
+			},
+		}
+		cmdScriptContent := `#!/bin/bash
+redirect-std() {
+    [[ -t 0 ]] && exec </dev/null
+    [[ -t 1 ]] && exec >/dev/null
+    [[ -t 2 ]] && exec 2>/dev/null
+}
+
+# close all non-std* fds
+close-fds() {
+    eval exec {3..255}\>\&-
+}
+ 
+# full daemonization of external command with setsid
+daemonize() {
+    (
+        redirect-std    
+        cd /            
+        close-fds       
+        exec setsid "$@"
+    ) &
+}
+
+echo $$
+daemonize $@
+sleep 5
+`
+		daemonScriptContent := `#!/bin/bash
+echo $$ > $BG_PIDFILE
+sleep 30
+`
+		err = ioutil.WriteFile(cmdScript, []byte(cmdScriptContent), 0755)
+		So(err, ShouldBeNil)
+		err = ioutil.WriteFile(daemonScript, []byte(daemonScriptContent), 0755)
+		So(err, ShouldBeNil)
+
+		provider, err := newCmdProvider(c)
+		So(err, ShouldBeNil)
+
+		cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
+		provider.AddHook(cg)
+
+		err = cg.preExec()
+		So(err, ShouldBeNil)
+
+		go func() {
+			err = provider.Run()
+			ctx.So(err, ShouldNotBeNil)
+		}()
+
+		time.Sleep(1 * time.Second)
+		// Deamon should be started
+		daemonPidBytes, err := ioutil.ReadFile(bgPidfile)
+		So(err, ShouldBeNil)
+		daemonPid := strings.Trim(string(daemonPidBytes), " \n")
+		logger.Debug("daemon pid: %s", daemonPid)
+		procDir := filepath.Join("/proc", daemonPid)
+		_, err = os.Stat(procDir)
+		So(err, ShouldBeNil)
+
+		err = provider.Terminate()
+		So(err, ShouldBeNil)
+
+		// Deamon won't be killed
+		_, err = os.Stat(procDir)
+		So(err, ShouldBeNil)
+
+		// Deamon can be killed by cgroup killer
+		cg.postExec()
+		_, err = os.Stat(procDir)
+		So(os.IsNotExist(err), ShouldBeTrue)
+
+	})
+}

+ 1 - 1
worker/cmd_provider.go

@@ -65,7 +65,7 @@ func (p *cmdProvider) Start() error {
 	for k, v := range p.env {
 		env[k] = v
 	}
-	p.cmd = newCmdJob(p.command, p.WorkingDir(), env)
+	p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env)
 	if err := p.prepareLogFile(); err != nil {
 		return err
 	}

+ 7 - 0
worker/config.go

@@ -35,6 +35,7 @@ type Config struct {
 	Global  globalConfig   `toml:"global"`
 	Manager managerConfig  `toml:"manager"`
 	Server  serverConfig   `toml:"server"`
+	Cgroup  cgroupConfig   `toml:"cgroup"`
 	Mirrors []mirrorConfig `toml:"mirrors"`
 }
 
@@ -60,6 +61,12 @@ type serverConfig struct {
 	SSLKey   string `toml:"ssl_key"`
 }
 
+type cgroupConfig struct {
+	Enable   bool   `toml:"enable"`
+	BasePath string `toml:"base_path"`
+	Group    string `toml:"group"`
+}
+
 type mirrorConfig struct {
 	Name      string            `toml:"name"`
 	Provider  ProviderEnum      `toml:"provider"`

+ 11 - 1
worker/provider.go

@@ -33,6 +33,8 @@ type mirrorProvider interface {
 	Terminate() error
 	// job hooks
 	IsRunning() bool
+	// Cgroup
+	Cgroup() *cgroupHook
 
 	AddHook(hook jobHook)
 	Hooks() []jobHook
@@ -63,7 +65,8 @@ type baseProvider struct {
 
 	logFile *os.File
 
-	hooks []jobHook
+	cgroup *cgroupHook
+	hooks  []jobHook
 }
 
 func (p *baseProvider) Name() string {
@@ -117,6 +120,9 @@ func (p *baseProvider) LogFile() string {
 }
 
 func (p *baseProvider) AddHook(hook jobHook) {
+	if cg, ok := hook.(*cgroupHook); ok {
+		p.cgroup = cg
+	}
 	p.hooks = append(p.hooks, hook)
 }
 
@@ -124,6 +130,10 @@ func (p *baseProvider) Hooks() []jobHook {
 	return p.hooks
 }
 
+func (p *baseProvider) Cgroup() *cgroupHook {
+	return p.cgroup
+}
+
 func (p *baseProvider) prepareLogFile() error {
 	if p.LogFile() == "/dev/null" {
 		p.cmd.SetLogFile(nil)

+ 1 - 1
worker/rsync_provider.go

@@ -84,7 +84,7 @@ func (p *rsyncProvider) Start() error {
 	command = append(command, p.options...)
 	command = append(command, p.upstreamURL, p.WorkingDir())
 
-	p.cmd = newCmdJob(command, p.WorkingDir(), env)
+	p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
 	if err := p.prepareLogFile(); err != nil {
 		return err
 	}

+ 17 - 9
worker/runner.go

@@ -13,7 +13,6 @@ import (
 
 // runner is to run os commands giving command line, env and log file
 // it's an alternative to python-sh or go-sh
-// TODO: cgroup excution
 
 var errProcessNotStarted = errors.New("Process Not Started")
 
@@ -23,18 +22,27 @@ type cmdJob struct {
 	env        map[string]string
 	logFile    *os.File
 	finished   chan empty
+	provider   mirrorProvider
 }
 
-func newCmdJob(cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob {
+func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob {
 	var cmd *exec.Cmd
-	if len(cmdAndArgs) == 1 {
-		cmd = exec.Command(cmdAndArgs[0])
-	} else if len(cmdAndArgs) > 1 {
-		c := cmdAndArgs[0]
-		args := cmdAndArgs[1:]
+
+	if provider.Cgroup() != nil {
+		c := "cgexec"
+		args := []string{"-g", provider.Cgroup().Cgroup()}
+		args = append(args, cmdAndArgs...)
 		cmd = exec.Command(c, args...)
-	} else if len(cmdAndArgs) == 0 {
-		panic("Command length should be at least 1!")
+	} else {
+		if len(cmdAndArgs) == 1 {
+			cmd = exec.Command(cmdAndArgs[0])
+		} else if len(cmdAndArgs) > 1 {
+			c := cmdAndArgs[0]
+			args := cmdAndArgs[1:]
+			cmd = exec.Command(c, args...)
+		} else if len(cmdAndArgs) == 0 {
+			panic("Command length should be at least 1!")
+		}
 	}
 
 	logger.Debug("Executing command %s at %s", cmdAndArgs[0], workingDir)

+ 1 - 1
worker/two_stage_rsync_provider.go

@@ -120,7 +120,7 @@ func (p *twoStageRsyncProvider) Run() error {
 		command = append(command, options...)
 		command = append(command, p.upstreamURL, p.WorkingDir())
 
-		p.cmd = newCmdJob(command, p.WorkingDir(), env)
+		p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
 		if err := p.prepareLogFile(); err != nil {
 			return err
 		}

+ 10 - 2
worker/worker.go

@@ -152,6 +152,14 @@ func (w *Worker) initProviders() {
 		}
 
 		provider.AddHook(newLogLimiter(provider))
+
+		// Add Cgroup Hook
+		if w.cfg.Cgroup.Enable {
+			provider.AddHook(
+				newCgroupHook(provider, w.cfg.Cgroup.BasePath, w.cfg.Cgroup.Group),
+			)
+		}
+
 		w.providers[provider.Name()] = provider
 
 	}
@@ -198,13 +206,13 @@ func (w *Worker) makeHTTPServer() {
 		case CmdStop:
 			// if job is disabled, no goroutine would be there
 			// receiving this signal
+			w.schedule.Remove(job.Name())
 			if job.State() != stateDisabled {
-				w.schedule.Remove(job.Name())
 				job.ctrlChan <- jobStop
 			}
 		case CmdDisable:
+			w.schedule.Remove(job.Name())
 			if job.State() != stateDisabled {
-				w.schedule.Remove(job.Name())
 				job.ctrlChan <- jobDisable
 				<-job.disabled
 			}