Преглед изворни кода

Merge pull request #56 from tuna/dev

Dev
bigeagle пре 8 година
родитељ
комит
54740388b3
10 измењених фајлова са 319 додато и 42 уклоњено
  1. 9 1
      .travis.yml
  2. 9 1
      worker/base_provider.go
  3. 14 16
      worker/cgroup.go
  4. 12 6
      worker/cgroup_test.go
  5. 17 3
      worker/config.go
  6. 98 0
      worker/docker.go
  7. 97 0
      worker/docker_test.go
  8. 12 3
      worker/provider.go
  9. 51 9
      worker/runner.go
  10. 0 3
      worker/worker.go

+ 9 - 1
.travis.yml

@@ -1,3 +1,5 @@
+sudo: required
+
 language: go
 go:
     - 1.6
@@ -11,8 +13,14 @@ before_install:
 os:
     - linux
 
+services:
+    - docker
+
 before_script:
-    - sudo cgcreate -t travis -a travis -g memory:tunasync
+    - lssubsys -am
+    - sudo cgcreate -a $USER -t $USER -g cpu:tunasync
+    - sudo cgcreate -a $USER -t $USER -g memory:tunasync
+    - docker pull alpine
 
 script:
     - ./.testandcover.bash

+ 9 - 1
worker/base_provider.go

@@ -24,7 +24,9 @@ type baseProvider struct {
 
 	cgroup *cgroupHook
 	zfs    *zfsHook
-	hooks  []jobHook
+	docker *dockerHook
+
+	hooks []jobHook
 }
 
 func (p *baseProvider) Name() string {
@@ -87,6 +89,8 @@ func (p *baseProvider) AddHook(hook jobHook) {
 		p.cgroup = v
 	case *zfsHook:
 		p.zfs = v
+	case *dockerHook:
+		p.docker = v
 	}
 	p.hooks = append(p.hooks, hook)
 }
@@ -103,6 +107,10 @@ func (p *baseProvider) ZFS() *zfsHook {
 	return p.zfs
 }
 
+func (p *baseProvider) Docker() *dockerHook {
+	return p.docker
+}
+
 func (p *baseProvider) prepareLogFile() error {
 	if p.LogFile() == "/dev/null" {
 		p.cmd.SetLogFile(nil)

+ 14 - 16
worker/cgroup.go

@@ -15,35 +15,31 @@ import (
 	"github.com/codeskyblue/go-sh"
 )
 
-var cgSubsystem = "cpu"
-
 type cgroupHook struct {
 	emptyHook
 	provider  mirrorProvider
 	basePath  string
 	baseGroup string
 	created   bool
+	subsystem string
+	memLimit  string
 }
 
-func initCgroup(basePath string) {
-	if _, err := os.Stat(filepath.Join(basePath, "memory")); err == nil {
-		cgSubsystem = "memory"
-		return
-	}
-	logger.Warning("Memory subsystem of cgroup not enabled, fallback to cpu")
-}
-
-func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook {
+func newCgroupHook(p mirrorProvider, basePath, baseGroup, subsystem, memLimit string) *cgroupHook {
 	if basePath == "" {
 		basePath = "/sys/fs/cgroup"
 	}
 	if baseGroup == "" {
 		baseGroup = "tunasync"
 	}
+	if subsystem == "" {
+		subsystem = "cpu"
+	}
 	return &cgroupHook{
 		provider:  p,
 		basePath:  basePath,
 		baseGroup: baseGroup,
+		subsystem: subsystem,
 	}
 }
 
@@ -52,13 +48,15 @@ func (c *cgroupHook) preExec() error {
 	if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil {
 		return err
 	}
-	if cgSubsystem != "memory" {
+	if c.subsystem != "memory" {
 		return nil
 	}
-	if c.provider.Type() == provRsync || c.provider.Type() == provTwoStageRsync {
+	if c.memLimit != "" {
 		gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name())
 		return sh.Command(
-			"cgset", "-r", "memory.limit_in_bytes=512M", gname,
+			"cgset", "-r",
+			fmt.Sprintf("memory.limit_in_bytes=%s", c.memLimit),
+			gname,
 		).Run()
 	}
 	return nil
@@ -76,7 +74,7 @@ func (c *cgroupHook) postExec() error {
 
 func (c *cgroupHook) Cgroup() string {
 	name := c.provider.Name()
-	return fmt.Sprintf("%s:%s/%s", cgSubsystem, c.baseGroup, name)
+	return fmt.Sprintf("%s:%s/%s", c.subsystem, c.baseGroup, name)
 }
 
 func (c *cgroupHook) killAll() error {
@@ -87,7 +85,7 @@ func (c *cgroupHook) killAll() error {
 
 	readTaskList := func() ([]int, error) {
 		taskList := []int{}
-		taskFile, err := os.Open(filepath.Join(c.basePath, cgSubsystem, c.baseGroup, name, "tasks"))
+		taskFile, err := os.Open(filepath.Join(c.basePath, c.subsystem, c.baseGroup, name, "tasks"))
 		if err != nil {
 			return taskList, err
 		}

+ 12 - 6
worker/cgroup_test.go

@@ -72,11 +72,14 @@ sleep 30
 		provider, err := newCmdProvider(c)
 		So(err, ShouldBeNil)
 
-		initCgroup("/sys/fs/cgroup")
-		cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
+		cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync", "cpu", "")
 		provider.AddHook(cg)
 
 		err = cg.preExec()
+		if err != nil {
+			logger.Errorf("Failed to create cgroup")
+			return
+		}
 		So(err, ShouldBeNil)
 
 		go func() {
@@ -129,12 +132,15 @@ sleep 30
 		provider, err := newRsyncProvider(c)
 		So(err, ShouldBeNil)
 
-		initCgroup("/sys/fs/cgroup")
-		cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
+		cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync", "cpu", "512M")
 		provider.AddHook(cg)
 
-		cg.preExec()
-		if cgSubsystem == "memory" {
+		err = cg.preExec()
+		if err != nil {
+			logger.Errorf("Failed to create cgroup")
+			return
+		}
+		if cg.subsystem == "memory" {
 			memoLimit, err := ioutil.ReadFile(filepath.Join(cg.basePath, "memory", cg.baseGroup, provider.Name(), "memory.limit_in_bytes"))
 			So(err, ShouldBeNil)
 			So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024))

+ 17 - 3
worker/config.go

@@ -38,6 +38,7 @@ type Config struct {
 	Server  serverConfig   `toml:"server"`
 	Cgroup  cgroupConfig   `toml:"cgroup"`
 	ZFS     zfsConfig      `toml:"zfs"`
+	Docker  dockerConfig   `toml:"docker"`
 	Include includeConfig  `toml:"include"`
 	Mirrors []mirrorConfig `toml:"mirrors"`
 }
@@ -69,9 +70,16 @@ type serverConfig struct {
 }
 
 type cgroupConfig struct {
-	Enable   bool   `toml:"enable"`
-	BasePath string `toml:"base_path"`
-	Group    string `toml:"group"`
+	Enable    bool   `toml:"enable"`
+	BasePath  string `toml:"base_path"`
+	Group     string `toml:"group"`
+	Subsystem string `toml:"subsystem"`
+}
+
+type dockerConfig struct {
+	Enable  bool     `toml:"enable"`
+	Volumes []string `toml:"volumes"`
+	Options []string `toml:"options"`
 }
 
 type zfsConfig struct {
@@ -111,6 +119,12 @@ type mirrorConfig struct {
 	Username      string `toml:"username"`
 	Password      string `toml:"password"`
 	Stage1Profile string `toml:"stage1_profile"`
+
+	MemoryLimit string `toml:"memory_limit"`
+
+	DockerImage   string   `toml:"docker_image"`
+	DockerVolumes []string `toml:"docker_volumes"`
+	DockerOptions []string `toml:"docker_options"`
 }
 
 // LoadConfig loads configuration

+ 98 - 0
worker/docker.go

@@ -0,0 +1,98 @@
+package worker
+
+import (
+	"fmt"
+	"os"
+)
+
+type dockerHook struct {
+	emptyHook
+	provider mirrorProvider
+	image    string
+	volumes  []string
+	options  []string
+}
+
+func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dockerHook {
+	volumes := []string{}
+	volumes = append(volumes, gCfg.Volumes...)
+	volumes = append(volumes, mCfg.DockerVolumes...)
+
+	options := []string{}
+	options = append(options, gCfg.Options...)
+	options = append(options, mCfg.DockerOptions...)
+
+	return &dockerHook{
+		provider: p,
+		image:    mCfg.DockerImage,
+		volumes:  volumes,
+		options:  options,
+	}
+}
+
+func (d *dockerHook) preExec() error {
+	p := d.provider
+	logFile := p.LogFile()
+	workingDir := p.WorkingDir()
+
+	if _, err := os.Stat(workingDir); os.IsNotExist(err) {
+		logger.Debugf("Making dir %s", workingDir)
+		if err = os.MkdirAll(workingDir, 0755); err != nil {
+			return fmt.Errorf("Error making dir %s: %s", workingDir, err.Error())
+		}
+	}
+
+	logFileNew := "/log_latest"
+	workingDirNew := "/data"
+
+	// Override workingDir
+	ctx := p.EnterContext()
+	ctx.Set(_WorkingDirKey, workingDirNew)
+	ctx.Set(_LogFileKey+":docker", logFileNew)
+	ctx.Set(
+		"volumes", []string{
+			fmt.Sprintf("%s:%s", logFile, logFileNew),
+			fmt.Sprintf("%s:%s", workingDir, workingDirNew),
+		},
+	)
+	return nil
+}
+
+func (d *dockerHook) postExec() error {
+	// sh.Command(
+	// 	"docker", "rm", "-f", d.Name(),
+	// ).Run()
+	d.provider.ExitContext()
+	return nil
+}
+
+// Volumes returns the configured volumes and
+// runtime-needed volumes, including mirror dirs
+// and log files
+func (d *dockerHook) Volumes() []string {
+	vols := make([]string, len(d.volumes))
+	copy(vols, d.volumes)
+
+	p := d.provider
+	ctx := p.Context()
+	if ivs, ok := ctx.Get("volumes"); ok {
+		vs := ivs.([]string)
+		vols = append(vols, vs...)
+	}
+	return vols
+}
+
+func (d *dockerHook) LogFile() string {
+	p := d.provider
+	ctx := p.Context()
+	if iv, ok := ctx.Get(_LogFileKey + ":docker"); ok {
+		v := iv.(string)
+		return v
+	}
+	return p.LogFile()
+}
+
+func (d *dockerHook) Name() string {
+	p := d.provider
+	return "tunasync-job-" + p.Name()
+}

+ 97 - 0
worker/docker_test.go

@@ -0,0 +1,97 @@
+package worker
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"testing"
+	"time"
+
+	"github.com/codeskyblue/go-sh"
+	. "github.com/smartystreets/goconvey/convey"
+)
+
+func getDockerByName(name string) (string, error) {
+	// docker ps -f 'name=$name' --format '{{.Names}}'
+	out, err := sh.Command(
+		"docker", "ps",
+		"--filter", "name="+name,
+		"--format", "{{.Names}}",
+	).Output()
+	return string(out), err
+}
+
+func TestDocker(t *testing.T) {
+	Convey("Docker Should Work", t, func(ctx C) {
+		tmpDir, err := ioutil.TempDir("", "tunasync")
+		defer os.RemoveAll(tmpDir)
+		So(err, ShouldBeNil)
+		cmdScript := filepath.Join(tmpDir, "cmd.sh")
+		tmpFile := filepath.Join(tmpDir, "log_file")
+		expectedOutput := "HELLO_WORLD"
+
+		c := cmdConfig{
+			name:        "tuna-docker",
+			upstreamURL: "http://mirrors.tuna.moe/",
+			command:     "/bin/cmd.sh",
+			workingDir:  tmpDir,
+			logDir:      tmpDir,
+			logFile:     tmpFile,
+			interval:    600 * time.Second,
+			env: map[string]string{
+				"TEST_CONTENT": expectedOutput,
+			},
+		}
+
+		cmdScriptContent := `#!/bin/sh
+echo ${TEST_CONTENT}
+sleep 10
+`
+		err = ioutil.WriteFile(cmdScript, []byte(cmdScriptContent), 0755)
+		So(err, ShouldBeNil)
+
+		provider, err := newCmdProvider(c)
+		So(err, ShouldBeNil)
+
+		d := &dockerHook{
+			provider: provider,
+			image:    "alpine",
+			volumes: []string{
+				fmt.Sprintf("%s:%s", cmdScript, "/bin/cmd.sh"),
+			},
+		}
+		provider.AddHook(d)
+		So(provider.Docker(), ShouldNotBeNil)
+
+		err = d.preExec()
+		So(err, ShouldBeNil)
+
+		go func() {
+			err = provider.Run()
+			ctx.So(err, ShouldNotBeNil)
+		}()
+
+		time.Sleep(1 * time.Second)
+
+		// assert container running
+		names, err := getDockerByName(d.Name())
+		So(err, ShouldBeNil)
+		So(names, ShouldEqual, d.Name()+"\n")
+
+		err = provider.Terminate()
+		So(err, ShouldBeNil)
+
+		// container should be terminated and removed
+		names, err = getDockerByName(d.Name())
+		So(err, ShouldBeNil)
+		So(names, ShouldEqual, "")
+
+		// check log content
+		loggedContent, err := ioutil.ReadFile(provider.LogFile())
+		So(err, ShouldBeNil)
+		So(string(loggedContent), ShouldEqual, expectedOutput+"\n")
+
+		d.postExec()
+	})
+}

+ 12 - 3
worker/provider.go

@@ -38,6 +38,8 @@ type mirrorProvider interface {
 	Cgroup() *cgroupHook
 	// ZFS
 	ZFS() *zfsHook
+	// Docker
+	Docker() *dockerHook
 
 	AddHook(hook jobHook)
 	Hooks() []jobHook
@@ -169,10 +171,17 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
 		provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool))
 	}
 
-	// Add Cgroup Hook
-	if cfg.Cgroup.Enable {
+	// Add Docker Hook
+	if cfg.Docker.Enable && len(mirror.DockerImage) > 0 {
+		provider.AddHook(newDockerHook(provider, cfg.Docker, mirror))
+
+	} else if cfg.Cgroup.Enable {
+		// Add Cgroup Hook
 		provider.AddHook(
-			newCgroupHook(provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group),
+			newCgroupHook(
+				provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group,
+				cfg.Cgroup.Subsystem, mirror.MemoryLimit,
+			),
 		)
 	}
 

+ 51 - 9
worker/runner.go

@@ -2,6 +2,7 @@ package worker
 
 import (
 	"errors"
+	"fmt"
 	"os"
 	"os/exec"
 	"strings"
@@ -9,6 +10,7 @@ import (
 	"syscall"
 	"time"
 
+	"github.com/codeskyblue/go-sh"
 	"golang.org/x/sys/unix"
 )
 
@@ -31,11 +33,40 @@ type cmdJob struct {
 func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob {
 	var cmd *exec.Cmd
 
-	if provider.Cgroup() != nil {
+	if d := provider.Docker(); d != nil {
+		c := "docker"
+		args := []string{
+			"run", "--rm",
+			"-a", "STDOUT", "-a", "STDERR",
+			"--name", d.Name(),
+			"-w", workingDir,
+		}
+		// add volumes
+		for _, vol := range d.Volumes() {
+			logger.Debugf("volume: %s", vol)
+			args = append(args, "-v", vol)
+		}
+		// set env
+		env["TUNASYNC_LOG_FILE"] = d.LogFile()
+		for k, v := range env {
+			kv := fmt.Sprintf("%s=%s", k, v)
+			args = append(args, "-e", kv)
+		}
+		// apply options
+		args = append(args, d.options...)
+		// apply image and command
+		args = append(args, d.image)
+		// apply command
+		args = append(args, cmdAndArgs...)
+
+		cmd = exec.Command(c, args...)
+
+	} else if provider.Cgroup() != nil {
 		c := "cgexec"
 		args := []string{"-g", provider.Cgroup().Cgroup()}
 		args = append(args, cmdAndArgs...)
 		cmd = exec.Command(c, args...)
+
 	} else {
 		if len(cmdAndArgs) == 1 {
 			cmd = exec.Command(cmdAndArgs[0])
@@ -48,25 +79,28 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
 		}
 	}
 
-	logger.Debugf("Executing command %s at %s", cmdAndArgs[0], workingDir)
-	if _, err := os.Stat(workingDir); os.IsNotExist(err) {
-		logger.Debugf("Making dir %s", workingDir)
-		if err = os.MkdirAll(workingDir, 0755); err != nil {
-			logger.Errorf("Error making dir %s", workingDir)
+	if provider.Docker() == nil {
+		logger.Debugf("Executing command %s at %s", cmdAndArgs[0], workingDir)
+		if _, err := os.Stat(workingDir); os.IsNotExist(err) {
+			logger.Debugf("Making dir %s", workingDir)
+			if err = os.MkdirAll(workingDir, 0755); err != nil {
+				logger.Errorf("Error making dir %s: %s", workingDir, err.Error())
+			}
 		}
+		cmd.Dir = workingDir
+		cmd.Env = newEnviron(env, true)
 	}
 
-	cmd.Dir = workingDir
-	cmd.Env = newEnviron(env, true)
-
 	return &cmdJob{
 		cmd:        cmd,
 		workingDir: workingDir,
 		env:        env,
+		provider:   provider,
 	}
 }
 
 func (c *cmdJob) Start() error {
+	// logger.Debugf("Command start: %v", c.cmd.Args)
 	c.finished = make(chan empty, 1)
 	return c.cmd.Start()
 }
@@ -95,6 +129,14 @@ func (c *cmdJob) Terminate() error {
 	if c.cmd == nil || c.cmd.Process == nil {
 		return errProcessNotStarted
 	}
+
+	if d := c.provider.Docker(); d != nil {
+		sh.Command(
+			"docker", "stop", "-t", "2", d.Name(),
+		).Run()
+		return nil
+	}
+
 	err := unix.Kill(c.cmd.Process.Pid, syscall.SIGTERM)
 	if err != nil {
 		return err

+ 0 - 3
worker/worker.go

@@ -55,9 +55,6 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
 		w.httpClient = httpClient
 	}
 
-	if cfg.Cgroup.Enable {
-		initCgroup(cfg.Cgroup.BasePath)
-	}
 	w.initJobs()
 	w.makeHTTPServer()
 	tunasyncWorker = w