Pārlūkot izejas kodu

Merge pull request #30 from tuna/dev

Dev
bigeagle 9 gadi atpakaļ
vecāks
revīzija
4e9c146557

+ 1 - 56
.gitignore

@@ -1,59 +1,4 @@
-# Byte-compiled / optimized / DLL files
-__pycache__/
-*.py[cod]
-
-# C extensions
-*.so
-
-# Distribution / packaging
-.Python
-env/
-build/
-develop-eggs/
-dist/
-downloads/
-eggs/
-lib/
-lib64/
-parts/
-sdist/
-var/
-*.egg-info/
-.installed.cfg
-*.egg
-
-# PyInstaller
-#  Usually these files are written by a python script from a template
-#  before PyInstaller builds the exe, so as to inject date/other infos into it.
-*.manifest
-*.spec
-
-# Installer logs
-pip-log.txt
-pip-delete-this-directory.txt
-
-# Unit test / coverage reports
-htmlcov/
-.tox/
-.coverage
-.cache
-nosetests.xml
-coverage.xml
-
-# Translations
-*.mo
-*.pot
-
-# Django stuff:
-*.log
-
-# Sphinx documentation
-docs/_build/
-
-# PyBuilder
-target/
-
 *.swp
 *~
-/examples/tunasync.json
 /*.cov
+node_modules

+ 1 - 1
.travis.yml

@@ -11,7 +11,7 @@ os:
     - linux
 
 before_script:
-    - sudo cgcreate -t travis -a travis -g cpu:tunasync
+    - sudo cgcreate -t travis -a travis -g memory:tunasync
 
 script:
     - ./.testandcover.bash

+ 1 - 14
README.md

@@ -3,6 +3,7 @@ tunasync
 
 [![Build Status](https://travis-ci.org/tuna/tunasync.svg?branch=dev)](https://travis-ci.org/tuna/tunasync)
 [![Coverage Status](https://coveralls.io/repos/github/tuna/tunasync/badge.svg?branch=dev)](https://coveralls.io/github/tuna/tunasync?branch=dev)
+[![Commitizen friendly](https://img.shields.io/badge/commitizen-friendly-brightgreen.svg)](http://commitizen.github.io/cz-cli/)
 ![GPLv3](https://img.shields.io/badge/license-GPLv3-blue.svg)
 
 ## Design
@@ -41,20 +42,6 @@ PreSyncing           Syncing                               Success
 					  +-----------------+
 ```
 
-## TODO
-
-- [x] split to `tunasync-manager` and `tunasync-worker` instances
-	- [x] use HTTP as communication protocol
-	- [x] implement manager as status server first, and use python worker
-	- [x] implement go worker
-- Web frontend for `tunasync-manager`
-	- [ ] start/stop/restart job
-	- [ ] enable/disable mirror
-	- [ ] view log
-- [ ] config file structure
-	- [ ] support multi-file configuration (`/etc/tunasync.d/mirror-enabled/*.conf`)
-
-
 ## Generate Self-Signed Certificate
 
 Fisrt, create root CA

+ 22 - 6
cmd/tunasync/tunasync.go

@@ -8,6 +8,7 @@ import (
 
 	"github.com/codegangsta/cli"
 	"github.com/gin-gonic/gin"
+	"github.com/pkg/profile"
 	"gopkg.in/op/go-logging.v1"
 
 	tunasync "github.com/tuna/tunasync/internal"
@@ -57,6 +58,20 @@ func startWorker(c *cli.Context) {
 		os.Exit(1)
 	}
 
+	if profPath := c.String("prof-path"); profPath != "" {
+		valid := false
+		if fi, err := os.Stat(profPath); err == nil {
+			if fi.IsDir() {
+				valid = true
+				defer profile.Start(profile.ProfilePath(profPath)).Stop()
+			}
+		}
+		if !valid {
+			logger.Errorf("Invalid profiling path: %s", profPath)
+			os.Exit(1)
+		}
+	}
+
 	go func() {
 		time.Sleep(1 * time.Second)
 		sigChan := make(chan os.Signal, 1)
@@ -70,8 +85,9 @@ func startWorker(c *cli.Context) {
 				newCfg, err := worker.LoadConfig(c.String("config"))
 				if err != nil {
 					logger.Errorf("Error loading config: %s", err.Error())
+				} else {
+					w.ReloadMirrorConfig(newCfg.Mirrors)
 				}
-				w.ReloadMirrorConfig(newCfg.Mirrors)
 			case syscall.SIGINT, syscall.SIGTERM:
 				w.Halt()
 			}
@@ -97,7 +113,6 @@ func main() {
 					Name:  "config, c",
 					Usage: "Load manager configurations from `FILE`",
 				},
-
 				cli.StringFlag{
 					Name:  "addr",
 					Usage: "The manager will listen on `ADDR`",
@@ -126,7 +141,6 @@ func main() {
 					Name:  "db-type",
 					Usage: "Use database type `TYPE`",
 				},
-
 				cli.BoolFlag{
 					Name:  "verbose, v",
 					Usage: "Enable verbose logging",
@@ -139,7 +153,6 @@ func main() {
 					Name:  "with-systemd",
 					Usage: "Enable systemd-compatible logging",
 				},
-
 				cli.StringFlag{
 					Name:  "pidfile",
 					Value: "/run/tunasync/tunasync.manager.pid",
@@ -157,7 +170,6 @@ func main() {
 					Name:  "config, c",
 					Usage: "Load worker configurations from `FILE`",
 				},
-
 				cli.BoolFlag{
 					Name:  "verbose, v",
 					Usage: "Enable verbose logging",
@@ -170,12 +182,16 @@ func main() {
 					Name:  "with-systemd",
 					Usage: "Enable systemd-compatible logging",
 				},
-
 				cli.StringFlag{
 					Name:  "pidfile",
 					Value: "/run/tunasync/tunasync.worker.pid",
 					Usage: "The pid file of the worker process",
 				},
+				cli.StringFlag{
+					Name:  "prof-path",
+					Value: "",
+					Usage: "Go profiling file path",
+				},
 			},
 		},
 	}

+ 78 - 56
cmd/tunasynctl/tunasynctl.go

@@ -20,8 +20,8 @@ const (
 	listWorkersPath = "/workers"
 	cmdPath         = "/cmd"
 
-	systemCfgFile = "/etc/tunasync/ctl.conf"
-	userCfgFile   = "$HOME/.config/tunasync/ctl.conf"
+	systemCfgFile = "/etc/tunasync/ctl.conf"          // system-wide conf
+	userCfgFile   = "$HOME/.config/tunasync/ctl.conf" // user-specific conf
 )
 
 var logger = logging.MustGetLogger("tunasynctl-cmd")
@@ -29,13 +29,13 @@ var logger = logging.MustGetLogger("tunasynctl-cmd")
 var baseURL string
 var client *http.Client
 
-func initializeWrapper(handler func(*cli.Context)) func(*cli.Context) {
-	return func(c *cli.Context) {
+func initializeWrapper(handler cli.ActionFunc) cli.ActionFunc {
+	return func(c *cli.Context) error {
 		err := initialize(c)
 		if err != nil {
-			os.Exit(1)
+			return cli.NewExitError("", 1)
 		}
-		handler(c)
+		return handler(c)
 	}
 }
 
@@ -45,49 +45,49 @@ type config struct {
 	CACert      string `toml:"ca_cert"`
 }
 
-func loadConfig(cfgFile string, c *cli.Context) (*config, error) {
-	cfg := new(config)
-	cfg.ManagerAddr = "localhost"
-	cfg.ManagerPort = 14242
-
+func loadConfig(cfgFile string, cfg *config) error {
 	if cfgFile != "" {
 		if _, err := toml.DecodeFile(cfgFile, cfg); err != nil {
 			logger.Errorf(err.Error())
-			return nil, err
+			return err
 		}
 	}
 
-	if c.String("manager") != "" {
-		cfg.ManagerAddr = c.String("manager")
-	}
-	if c.Int("port") > 0 {
-		cfg.ManagerPort = c.Int("port")
-	}
-
-	if c.String("ca-cert") != "" {
-		cfg.CACert = c.String("ca-cert")
-	}
-	return cfg, nil
+	return nil
 }
 
 func initialize(c *cli.Context) error {
 	// init logger
 	tunasync.InitLogger(c.Bool("verbose"), c.Bool("verbose"), false)
-	var cfgFile string
 
-	// choose config file and load config
+	cfg := new(config)
+
+	// default configs
+	cfg.ManagerAddr = "localhost"
+	cfg.ManagerPort = 14242
+
+	// find config file and load config
+	if _, err := os.Stat(systemCfgFile); err == nil {
+		loadConfig(systemCfgFile, cfg)
+	}
+	fmt.Println(os.ExpandEnv(userCfgFile))
+	if _, err := os.Stat(os.ExpandEnv(userCfgFile)); err == nil {
+		loadConfig(os.ExpandEnv(userCfgFile), cfg)
+	}
 	if c.String("config") != "" {
-		cfgFile = c.String("config")
-	} else if _, err := os.Stat(os.ExpandEnv(userCfgFile)); err == nil {
-		cfgFile = os.ExpandEnv(userCfgFile)
-	} else if _, err := os.Stat(systemCfgFile); err == nil {
-		cfgFile = systemCfgFile
+		loadConfig(c.String("config"), cfg)
 	}
-	cfg, err := loadConfig(cfgFile, c)
 
-	if err != nil {
-		logger.Errorf("Load configuration for tunasynctl error: %s", err.Error())
-		return err
+	// override config using the command-line arguments
+	if c.String("manager") != "" {
+		cfg.ManagerAddr = c.String("manager")
+	}
+	if c.Int("port") > 0 {
+		cfg.ManagerPort = c.Int("port")
+	}
+
+	if c.String("ca-cert") != "" {
+		cfg.CACert = c.String("ca-cert")
 	}
 
 	// parse base url of the manager server
@@ -97,6 +97,7 @@ func initialize(c *cli.Context) error {
 	logger.Infof("Use manager address: %s", baseURL)
 
 	// create HTTP client
+	var err error
 	client, err = tunasync.CreateHTTPClient(cfg.CACert)
 	if err != nil {
 		err = fmt.Errorf("Error initializing HTTP client: %s", err.Error())
@@ -107,44 +108,54 @@ func initialize(c *cli.Context) error {
 	return nil
 }
 
-func listWorkers(c *cli.Context) {
+func listWorkers(c *cli.Context) error {
 	var workers []tunasync.WorkerStatus
 	_, err := tunasync.GetJSON(baseURL+listWorkersPath, &workers, client)
 	if err != nil {
-		logger.Errorf("Filed to correctly get informations from manager server: %s", err.Error())
-		os.Exit(1)
+		return cli.NewExitError(
+			fmt.Sprintf("Filed to correctly get informations from"+
+				"manager server: %s", err.Error()), 1)
 	}
 
 	b, err := json.MarshalIndent(workers, "", "  ")
 	if err != nil {
-		logger.Errorf("Error printing out informations: %s", err.Error())
+		return cli.NewExitError(
+			fmt.Sprintf("Error printing out informations: %s",
+				err.Error()),
+			1)
 	}
 	fmt.Print(string(b))
+	return nil
 }
 
-func listJobs(c *cli.Context) {
+func listJobs(c *cli.Context) error {
 	// FIXME: there should be an API on manager server side that return MirrorStatus list to tunasynctl
 	var jobs []tunasync.MirrorStatus
 	if c.Bool("all") {
 		_, err := tunasync.GetJSON(baseURL+listJobsPath, &jobs, client)
 		if err != nil {
-			logger.Errorf("Filed to correctly get information of all jobs from manager server: %s", err.Error())
-			os.Exit(1)
+			return cli.NewExitError(
+				fmt.Sprintf("Failed to correctly get information "+
+					"of all jobs from manager server: %s", err.Error()),
+				1)
 		}
 
 	} else {
 		args := c.Args()
 		if len(args) == 0 {
-			logger.Error("Usage Error: jobs command need at least one arguments or \"--all\" flag.")
-			os.Exit(1)
+			return cli.NewExitError(
+				fmt.Sprintf("Usage Error: jobs command need at"+
+					" least one arguments or \"--all\" flag."), 1)
 		}
 		ans := make(chan []tunasync.MirrorStatus, len(args))
 		for _, workerID := range args {
 			go func(workerID string) {
 				var workerJobs []tunasync.MirrorStatus
-				_, err := tunasync.GetJSON(fmt.Sprintf("%s/workers/%s/jobs", baseURL, workerID), &workerJobs, client)
+				_, err := tunasync.GetJSON(fmt.Sprintf("%s/workers/%s/jobs",
+					baseURL, workerID), &workerJobs, client)
 				if err != nil {
-					logger.Errorf("Filed to correctly get jobs for worker %s: %s", workerID, err.Error())
+					logger.Errorf("Filed to correctly get jobs"+
+						" for worker %s: %s", workerID, err.Error())
 				}
 				ans <- workerJobs
 			}(workerID)
@@ -156,13 +167,16 @@ func listJobs(c *cli.Context) {
 
 	b, err := json.MarshalIndent(jobs, "", "  ")
 	if err != nil {
-		logger.Errorf("Error printing out informations: %s", err.Error())
+		return cli.NewExitError(
+			fmt.Sprintf("Error printing out informations: %s", err.Error()),
+			1)
 	}
 	fmt.Printf(string(b))
+	return nil
 }
 
-func cmdJob(cmd tunasync.CmdVerb) func(*cli.Context) {
-	return func(c *cli.Context) {
+func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
+	return func(c *cli.Context) error {
 		var mirrorID string
 		var argsList []string
 		if len(c.Args()) == 1 {
@@ -173,8 +187,9 @@ func cmdJob(cmd tunasync.CmdVerb) func(*cli.Context) {
 				argsList = append(argsList, strings.TrimSpace(arg))
 			}
 		} else {
-			logger.Error("Usage Error: cmd command receive just 1 required positional argument MIRROR and 1 optional ")
-			os.Exit(1)
+			return cli.NewExitError("Usage Error: cmd command receive just "+
+				"1 required positional argument MIRROR and 1 optional "+
+				"argument WORKER", 1)
 		}
 
 		cmd := tunasync.ClientCmd{
@@ -185,21 +200,28 @@ func cmdJob(cmd tunasync.CmdVerb) func(*cli.Context) {
 		}
 		resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client)
 		if err != nil {
-			logger.Errorf("Failed to correctly send command: %s", err.Error())
-			os.Exit(1)
+			return cli.NewExitError(
+				fmt.Sprintf("Failed to correctly send command: %s",
+					err.Error()),
+				1)
 		}
 		defer resp.Body.Close()
 
 		if resp.StatusCode != http.StatusOK {
 			body, err := ioutil.ReadAll(resp.Body)
 			if err != nil {
-				logger.Errorf("Failed to parse response: %s", err.Error())
+				return cli.NewExitError(
+					fmt.Sprintf("Failed to parse response: %s", err.Error()),
+					1)
 			}
 
-			logger.Errorf("Failed to correctly send command: HTTP status code is not 200: %s", body)
-		} else {
-			logger.Info("Succesfully send command")
+			return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
+				" command: HTTP status code is not 200: %s", body),
+				1)
 		}
+		logger.Info("Succesfully send command")
+
+		return nil
 	}
 }
 

+ 13 - 0
package.json

@@ -0,0 +1,13 @@
+{
+  "name": "tunasync",
+  "version": "1.0.0b1",
+  "description": "This is not a node project!",
+  "devDependencies": {
+    "cz-conventional-changelog": "^1.1.6"
+  },
+  "config": {
+    "commitizen": {
+      "path": "cz-conventional-changelog"
+    }
+  }
+}

+ 2 - 2
systemd/tunasync-worker.service

@@ -6,10 +6,10 @@ After=network.target
 Type=simple
 User=tunasync
 PermissionsStartOnly=true
-ExecStartPre=/usr/bin/cgcreate -t tunasync -a tunasync -g cpu:tunasync
+ExecStartPre=/usr/bin/cgcreate -t tunasync -a tunasync -g memory:tunasync
 ExecStart=/home/bin/tunasync worker -c /etc/tunasync/worker.conf --with-systemd
 ExecReload=/bin/kill -SIGHUP $MAINPID
-ExecStopPost=/usr/bin/cgdelete cpu:tunasync
+ExecStopPost=/usr/bin/cgdelete memory:tunasync
 
 [Install]
 WantedBy=multi-user.target

+ 25 - 3
worker/cgroup.go

@@ -13,6 +13,8 @@ import (
 	"github.com/codeskyblue/go-sh"
 )
 
+var cgSubsystem string = "cpu"
+
 type cgroupHook struct {
 	emptyHook
 	provider  mirrorProvider
@@ -21,6 +23,14 @@ type cgroupHook struct {
 	created   bool
 }
 
+func initCgroup(basePath string) {
+	if _, err := os.Stat(filepath.Join(basePath, "memory")); err == nil {
+		cgSubsystem = "memory"
+		return
+	}
+	logger.Warning("Memory subsystem of cgroup not enabled, fallback to cpu")
+}
+
 func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook {
 	if basePath == "" {
 		basePath = "/sys/fs/cgroup"
@@ -37,7 +47,19 @@ func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook {
 
 func (c *cgroupHook) preExec() error {
 	c.created = true
-	return sh.Command("cgcreate", "-g", c.Cgroup()).Run()
+	if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil {
+		return err
+	}
+	if cgSubsystem != "memory" {
+		return nil
+	}
+	if c.provider.Type() == provRsync || c.provider.Type() == provTwoStageRsync {
+		gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name())
+		return sh.Command(
+			"cgset", "-r", "memory.limit_in_bytes=128M", gname,
+		).Run()
+	}
+	return nil
 }
 
 func (c *cgroupHook) postExec() error {
@@ -52,7 +74,7 @@ func (c *cgroupHook) postExec() error {
 
 func (c *cgroupHook) Cgroup() string {
 	name := c.provider.Name()
-	return fmt.Sprintf("cpu:%s/%s", c.baseGroup, name)
+	return fmt.Sprintf("%s:%s/%s", cgSubsystem, c.baseGroup, name)
 }
 
 func (c *cgroupHook) killAll() error {
@@ -60,7 +82,7 @@ func (c *cgroupHook) killAll() error {
 		return nil
 	}
 	name := c.provider.Name()
-	taskFile, err := os.Open(filepath.Join(c.basePath, "cpu", c.baseGroup, name, "tasks"))
+	taskFile, err := os.Open(filepath.Join(c.basePath, cgSubsystem, c.baseGroup, name, "tasks"))
 	if err != nil {
 		return err
 	}

+ 36 - 0
worker/cgroup_test.go

@@ -4,6 +4,7 @@ import (
 	"io/ioutil"
 	"os"
 	"path/filepath"
+	"strconv"
 	"strings"
 	"testing"
 	"time"
@@ -71,6 +72,7 @@ sleep 30
 		provider, err := newCmdProvider(c)
 		So(err, ShouldBeNil)
 
+		initCgroup("/sys/fs/cgroup")
 		cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
 		provider.AddHook(cg)
 
@@ -105,4 +107,38 @@ sleep 30
 		So(os.IsNotExist(err), ShouldBeTrue)
 
 	})
+
+	Convey("Rsync Memory Should Be Limited", t, func() {
+		tmpDir, err := ioutil.TempDir("", "tunasync")
+		defer os.RemoveAll(tmpDir)
+		So(err, ShouldBeNil)
+		scriptFile := filepath.Join(tmpDir, "myrsync")
+		tmpFile := filepath.Join(tmpDir, "log_file")
+
+		c := rsyncConfig{
+			name:        "tuna-cgroup",
+			upstreamURL: "rsync://rsync.tuna.moe/tuna/",
+			rsyncCmd:    scriptFile,
+			workingDir:  tmpDir,
+			logDir:      tmpDir,
+			logFile:     tmpFile,
+			useIPv6:     true,
+			interval:    600 * time.Second,
+		}
+
+		provider, err := newRsyncProvider(c)
+		So(err, ShouldBeNil)
+
+		initCgroup("/sys/fs/cgroup")
+		cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
+		provider.AddHook(cg)
+
+		cg.preExec()
+		if cgSubsystem == "memory" {
+			memoLimit, err := ioutil.ReadFile(filepath.Join(cg.basePath, "memory", cg.baseGroup, provider.Name(), "memory.limit_in_bytes"))
+			So(err, ShouldBeNil)
+			So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(128*1024*1024))
+		}
+		cg.postExec()
+	})
 }

+ 4 - 0
worker/cmd_provider.go

@@ -44,6 +44,10 @@ func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
 	return provider, nil
 }
 
+func (p *cmdProvider) Type() providerEnum {
+	return provCommand
+}
+
 func (p *cmdProvider) Upstream() string {
 	return p.upstreamURL
 }

+ 2 - 2
worker/provider.go

@@ -10,8 +10,6 @@ import (
 
 // mirror provider is the wrapper of mirror jobs
 
-type providerType uint8
-
 const (
 	_WorkingDirKey = "working_dir"
 	_LogDirKey     = "log_dir"
@@ -24,6 +22,8 @@ type mirrorProvider interface {
 	Name() string
 	Upstream() string
 
+	Type() providerEnum
+
 	// run mirror job in background
 	Run() error
 	// run mirror job in background

+ 3 - 0
worker/provider_test.go

@@ -33,6 +33,7 @@ func TestRsyncProvider(t *testing.T) {
 		provider, err := newRsyncProvider(c)
 		So(err, ShouldBeNil)
 
+		So(provider.Type(), ShouldEqual, provRsync)
 		So(provider.Name(), ShouldEqual, c.name)
 		So(provider.WorkingDir(), ShouldEqual, c.workingDir)
 		So(provider.LogDir(), ShouldEqual, c.logDir)
@@ -126,6 +127,7 @@ func TestCmdProvider(t *testing.T) {
 		provider, err := newCmdProvider(c)
 		So(err, ShouldBeNil)
 
+		So(provider.Type(), ShouldEqual, provCommand)
 		So(provider.Name(), ShouldEqual, c.name)
 		So(provider.WorkingDir(), ShouldEqual, c.workingDir)
 		So(provider.LogDir(), ShouldEqual, c.logDir)
@@ -218,6 +220,7 @@ func TestTwoStageRsyncProvider(t *testing.T) {
 		provider, err := newTwoStageRsyncProvider(c)
 		So(err, ShouldBeNil)
 
+		So(provider.Type(), ShouldEqual, provTwoStageRsync)
 		So(provider.Name(), ShouldEqual, c.name)
 		So(provider.WorkingDir(), ShouldEqual, c.workingDir)
 		So(provider.LogDir(), ShouldEqual, c.logDir)

+ 4 - 0
worker/rsync_provider.go

@@ -63,6 +63,10 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
 	return provider, nil
 }
 
+func (p *rsyncProvider) Type() providerEnum {
+	return provRsync
+}
+
 func (p *rsyncProvider) Upstream() string {
 	return p.upstreamURL
 }

+ 4 - 0
worker/two_stage_rsync_provider.go

@@ -70,6 +70,10 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er
 	return provider, nil
 }
 
+func (p *twoStageRsyncProvider) Type() providerEnum {
+	return provTwoStageRsync
+}
+
 func (p *twoStageRsyncProvider) Upstream() string {
 	return p.upstreamURL
 }

+ 3 - 0
worker/worker.go

@@ -53,6 +53,9 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
 		w.httpClient = httpClient
 	}
 
+	if cfg.Cgroup.Enable {
+		initCgroup(cfg.Cgroup.BasePath)
+	}
 	w.initJobs()
 	w.makeHTTPServer()
 	tunasyncWorker = w