2
0
Эх сурвалжийг харах

feature(worker): ability to hot reload mirror job configrations, close #18

bigeagle 9 жил өмнө
parent
commit
51fa12900d

+ 22 - 1
cmd/tunasync/tunasync.go

@@ -2,6 +2,9 @@ package main
 
 import (
 	"os"
+	"os/signal"
+	"syscall"
+	"time"
 
 	"github.com/codegangsta/cli"
 	"github.com/gin-gonic/gin"
@@ -12,7 +15,7 @@ import (
 	"github.com/tuna/tunasync/worker"
 )
 
-var logger = logging.MustGetLogger("tunasync-cmd")
+var logger = logging.MustGetLogger("tunasync")
 
 func startManager(c *cli.Context) {
 	tunasync.InitLogger(c.Bool("verbose"), c.Bool("debug"), c.Bool("with-systemd"))
@@ -54,6 +57,24 @@ func startWorker(c *cli.Context) {
 		os.Exit(1)
 	}
 
+	go func() {
+		time.Sleep(1 * time.Second)
+		sigChan := make(chan os.Signal, 1)
+		signal.Notify(sigChan, syscall.SIGHUP)
+		for {
+			s := <-sigChan
+			switch s {
+			case syscall.SIGHUP:
+				logger.Info("Received reload signal")
+				newCfg, err := worker.LoadConfig(c.String("config"))
+				if err != nil {
+					logger.Errorf("Error loading config: %s", err.Error())
+				}
+				w.ReloadMirrorConfig(newCfg.Mirrors)
+			}
+		}
+	}()
+
 	logger.Info("Run tunasync worker.")
 	w.Run()
 }

+ 158 - 0
worker/base_provider.go

@@ -0,0 +1,158 @@
+package worker
+
+import (
+	"os"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+// baseProvider is the base mixin of providers
+
+type baseProvider struct {
+	sync.Mutex
+
+	ctx      *Context
+	name     string
+	interval time.Duration
+	isMaster bool
+
+	cmd       *cmdJob
+	isRunning atomic.Value
+
+	logFile *os.File
+
+	cgroup *cgroupHook
+	hooks  []jobHook
+}
+
+func (p *baseProvider) Name() string {
+	return p.name
+}
+
+func (p *baseProvider) EnterContext() *Context {
+	p.ctx = p.ctx.Enter()
+	return p.ctx
+}
+
+func (p *baseProvider) ExitContext() *Context {
+	p.ctx, _ = p.ctx.Exit()
+	return p.ctx
+}
+
+func (p *baseProvider) Context() *Context {
+	return p.ctx
+}
+
+func (p *baseProvider) Interval() time.Duration {
+	// logger.Debug("interval for %s: %v", p.Name(), p.interval)
+	return p.interval
+}
+
+func (p *baseProvider) IsMaster() bool {
+	return p.isMaster
+}
+
+func (p *baseProvider) WorkingDir() string {
+	if v, ok := p.ctx.Get(_WorkingDirKey); ok {
+		if s, ok := v.(string); ok {
+			return s
+		}
+	}
+	panic("working dir is impossible to be non-exist")
+}
+
+func (p *baseProvider) LogDir() string {
+	if v, ok := p.ctx.Get(_LogDirKey); ok {
+		if s, ok := v.(string); ok {
+			return s
+		}
+	}
+	panic("log dir is impossible to be unavailable")
+}
+
+func (p *baseProvider) LogFile() string {
+	if v, ok := p.ctx.Get(_LogFileKey); ok {
+		if s, ok := v.(string); ok {
+			return s
+		}
+	}
+	panic("log dir is impossible to be unavailable")
+}
+
+func (p *baseProvider) AddHook(hook jobHook) {
+	if cg, ok := hook.(*cgroupHook); ok {
+		p.cgroup = cg
+	}
+	p.hooks = append(p.hooks, hook)
+}
+
+func (p *baseProvider) Hooks() []jobHook {
+	return p.hooks
+}
+
+func (p *baseProvider) Cgroup() *cgroupHook {
+	return p.cgroup
+}
+
+func (p *baseProvider) prepareLogFile() error {
+	if p.LogFile() == "/dev/null" {
+		p.cmd.SetLogFile(nil)
+		return nil
+	}
+	if p.logFile == nil {
+		logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
+		if err != nil {
+			logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
+			return err
+		}
+		p.logFile = logFile
+	}
+	p.cmd.SetLogFile(p.logFile)
+	return nil
+}
+
+func (p *baseProvider) Run() error {
+	panic("Not Implemented")
+}
+
+func (p *baseProvider) Start() error {
+	panic("Not Implemented")
+}
+
+func (p *baseProvider) IsRunning() bool {
+	isRunning, _ := p.isRunning.Load().(bool)
+	return isRunning
+}
+
+func (p *baseProvider) Wait() error {
+	defer func() {
+		p.Lock()
+		p.isRunning.Store(false)
+		if p.logFile != nil {
+			p.logFile.Close()
+			p.logFile = nil
+		}
+		p.Unlock()
+	}()
+	return p.cmd.Wait()
+}
+
+func (p *baseProvider) Terminate() error {
+	logger.Debugf("terminating provider: %s", p.Name())
+	if !p.IsRunning() {
+		return nil
+	}
+
+	p.Lock()
+	if p.logFile != nil {
+		p.logFile.Close()
+		p.logFile = nil
+	}
+	p.Unlock()
+
+	err := p.cmd.Terminate()
+	p.isRunning.Store(false)
+
+	return err
+}

+ 10 - 10
worker/config.go

@@ -7,30 +7,30 @@ import (
 	"github.com/BurntSushi/toml"
 )
 
-type ProviderEnum uint8
+type providerEnum uint8
 
 const (
-	ProvRsync ProviderEnum = iota
-	ProvTwoStageRsync
-	ProvCommand
+	provRsync providerEnum = iota
+	provTwoStageRsync
+	provCommand
 )
 
-func (p *ProviderEnum) UnmarshalText(text []byte) error {
+func (p *providerEnum) UnmarshalText(text []byte) error {
 	s := string(text)
 	switch s {
 	case `command`:
-		*p = ProvCommand
+		*p = provCommand
 	case `rsync`:
-		*p = ProvRsync
+		*p = provRsync
 	case `two-stage-rsync`:
-		*p = ProvTwoStageRsync
+		*p = provTwoStageRsync
 	default:
 		return errors.New("Invalid value to provierEnum")
 	}
 	return nil
-
 }
 
+// Worker config options
 type Config struct {
 	Global  globalConfig   `toml:"global"`
 	Manager managerConfig  `toml:"manager"`
@@ -69,7 +69,7 @@ type cgroupConfig struct {
 
 type mirrorConfig struct {
 	Name      string            `toml:"name"`
-	Provider  ProviderEnum      `toml:"provider"`
+	Provider  providerEnum      `toml:"provider"`
 	Upstream  string            `toml:"upstream"`
 	Interval  int               `toml:"interval"`
 	MirrorDir string            `toml:"mirror_dir"`

+ 88 - 0
worker/config_diff.go

@@ -0,0 +1,88 @@
+package worker
+
+import (
+	"fmt"
+	"reflect"
+	"sort"
+)
+
+// Find difference of mirror config, this is important for hot reloading config file
+// NOTICE: only the [[mirrors]] section is supported
+
+// make []mirrorConfig sortable
+type sortableMirrorList []mirrorConfig
+
+func (l sortableMirrorList) Len() int           { return len(l) }
+func (l sortableMirrorList) Swap(i, j int)      { l[i], l[j] = l[j], l[i] }
+func (l sortableMirrorList) Less(i, j int) bool { return l[i].Name < l[j].Name }
+
+const (
+	diffDelete uint8 = iota
+	diffAdd
+	diffModify
+)
+
+// a unit of mirror config difference
+type mirrorCfgTrans struct {
+	diffOp uint8
+	mirCfg mirrorConfig
+}
+
+func (t mirrorCfgTrans) String() string {
+	var op string
+	if t.diffOp == diffDelete {
+		op = "Del"
+	} else {
+		op = "Add"
+	}
+	return fmt.Sprintf("{%s, %s}", op, t.mirCfg.Name)
+}
+
+// diffMirrorConfig finds the difference between the oldList and the newList
+// it returns a series of operations that if these operations are applied to
+// oldList, a newList equavuilance can be obtained.
+func diffMirrorConfig(oldList, newList []mirrorConfig) []mirrorCfgTrans {
+	operations := []mirrorCfgTrans{}
+
+	oList := make([]mirrorConfig, len(oldList))
+	nList := make([]mirrorConfig, len(newList))
+	copy(oList, oldList)
+	copy(nList, newList)
+
+	// first ensure oldList and newList are sorted
+	sort.Sort(sortableMirrorList(oList))
+	sort.Sort(sortableMirrorList(nList))
+
+	// insert a tail node to both lists
+	// as the maximum node
+	lastOld, lastNew := oList[len(oList)-1], nList[len(nList)-1]
+	maxName := lastOld.Name
+	if lastNew.Name > lastOld.Name {
+		maxName = lastNew.Name
+	}
+	Nil := mirrorConfig{Name: "~" + maxName}
+	if Nil.Name <= maxName {
+		panic("Nil.Name should be larger than maxName")
+	}
+	oList, nList = append(oList, Nil), append(nList, Nil)
+
+	// iterate over both lists to find the difference
+	for i, j := 0, 0; i < len(oList) && j < len(nList); {
+		o, n := oList[i], nList[j]
+		if n.Name < o.Name {
+			operations = append(operations, mirrorCfgTrans{diffAdd, n})
+			j++
+		} else if o.Name < n.Name {
+			operations = append(operations, mirrorCfgTrans{diffDelete, o})
+			i++
+		} else {
+			if !reflect.DeepEqual(o, n) {
+				operations = append(operations, mirrorCfgTrans{diffModify, n})
+			}
+			i++
+			j++
+		}
+	}
+
+	return operations
+}

+ 73 - 0
worker/config_diff_test.go

@@ -0,0 +1,73 @@
+package worker
+
+import (
+	"sort"
+	"testing"
+
+	. "github.com/smartystreets/goconvey/convey"
+)
+
+func TestConfigDiff(t *testing.T) {
+	Convey("When old and new configs are equal", t, func() {
+		oldList := []mirrorConfig{
+			mirrorConfig{Name: "debian"},
+			mirrorConfig{Name: "debian-security"},
+			mirrorConfig{Name: "fedora"},
+			mirrorConfig{Name: "archlinux"},
+			mirrorConfig{Name: "AOSP"},
+			mirrorConfig{Name: "ubuntu"},
+		}
+		newList := make([]mirrorConfig, len(oldList))
+		copy(newList, oldList)
+
+		difference := diffMirrorConfig(oldList, newList)
+		So(len(difference), ShouldEqual, 0)
+	})
+	Convey("When giving two config lists with different names", t, func() {
+		oldList := []mirrorConfig{
+			mirrorConfig{Name: "debian"},
+			mirrorConfig{Name: "debian-security"},
+			mirrorConfig{Name: "fedora"},
+			mirrorConfig{Name: "archlinux"},
+			mirrorConfig{Name: "AOSP", Env: map[string]string{"REPO": "/usr/bin/repo"}},
+			mirrorConfig{Name: "ubuntu"},
+		}
+		newList := []mirrorConfig{
+			mirrorConfig{Name: "debian"},
+			mirrorConfig{Name: "debian-cd"},
+			mirrorConfig{Name: "archlinuxcn"},
+			mirrorConfig{Name: "AOSP", Env: map[string]string{"REPO": "/usr/local/bin/aosp-repo"}},
+			mirrorConfig{Name: "ubuntu-ports"},
+		}
+
+		difference := diffMirrorConfig(oldList, newList)
+
+		sort.Sort(sortableMirrorList(oldList))
+		emptyList := []mirrorConfig{}
+
+		for _, o := range oldList {
+			keep := true
+			for _, op := range difference {
+				if (op.diffOp == diffDelete || op.diffOp == diffModify) &&
+					op.mirCfg.Name == o.Name {
+
+					keep = false
+					break
+				}
+			}
+			if keep {
+				emptyList = append(emptyList, o)
+			}
+		}
+
+		for _, op := range difference {
+			if op.diffOp == diffAdd || op.diffOp == diffModify {
+				emptyList = append(emptyList, op.mirCfg)
+			}
+		}
+		sort.Sort(sortableMirrorList(emptyList))
+		sort.Sort(sortableMirrorList(newList))
+		So(emptyList, ShouldResemble, newList)
+
+	})
+}

+ 10 - 11
worker/config_test.go

@@ -82,19 +82,19 @@ exec_on_failure = "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKIN
 		m := cfg.Mirrors[0]
 		So(m.Name, ShouldEqual, "AOSP")
 		So(m.MirrorDir, ShouldEqual, "/data/git/AOSP")
-		So(m.Provider, ShouldEqual, ProvCommand)
+		So(m.Provider, ShouldEqual, provCommand)
 		So(m.Interval, ShouldEqual, 720)
 		So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo")
 
 		m = cfg.Mirrors[1]
 		So(m.Name, ShouldEqual, "debian")
 		So(m.MirrorDir, ShouldEqual, "")
-		So(m.Provider, ShouldEqual, ProvTwoStageRsync)
+		So(m.Provider, ShouldEqual, provTwoStageRsync)
 
 		m = cfg.Mirrors[2]
 		So(m.Name, ShouldEqual, "fedora")
 		So(m.MirrorDir, ShouldEqual, "")
-		So(m.Provider, ShouldEqual, ProvRsync)
+		So(m.Provider, ShouldEqual, provRsync)
 		So(m.ExcludeFile, ShouldEqual, "/etc/tunasync.d/fedora-exclude.txt")
 
 		So(len(cfg.Mirrors), ShouldEqual, 3)
@@ -112,14 +112,13 @@ exec_on_failure = "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKIN
 		cfg, err := LoadConfig(tmpfile.Name())
 		So(err, ShouldBeNil)
 
-		w := &Worker{
-			cfg:       cfg,
-			providers: make(map[string]mirrorProvider),
+		providers := map[string]mirrorProvider{}
+		for _, m := range cfg.Mirrors {
+			p := newMirrorProvider(m, cfg)
+			providers[p.Name()] = p
 		}
 
-		w.initProviders()
-
-		p := w.providers["AOSP"]
+		p := providers["AOSP"]
 		So(p.Name(), ShouldEqual, "AOSP")
 		So(p.LogDir(), ShouldEqual, "/var/log/tunasync/AOSP")
 		So(p.LogFile(), ShouldEqual, "/var/log/tunasync/AOSP/latest.log")
@@ -132,7 +131,7 @@ exec_on_failure = "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKIN
 			}
 		}
 
-		p = w.providers["debian"]
+		p = providers["debian"]
 		So(p.Name(), ShouldEqual, "debian")
 		So(p.LogDir(), ShouldEqual, "/var/log/tunasync/debian")
 		So(p.LogFile(), ShouldEqual, "/var/log/tunasync/debian/latest.log")
@@ -141,7 +140,7 @@ exec_on_failure = "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKIN
 		So(r2p.stage1Profile, ShouldEqual, "debian")
 		So(r2p.WorkingDir(), ShouldEqual, "/data/mirrors/debian")
 
-		p = w.providers["fedora"]
+		p = providers["fedora"]
 		So(p.Name(), ShouldEqual, "fedora")
 		So(p.LogDir(), ShouldEqual, "/var/log/tunasync/fedora")
 		So(p.LogFile(), ShouldEqual, "/var/log/tunasync/fedora/latest.log")

+ 11 - 2
worker/job.go

@@ -65,6 +65,15 @@ func (m *mirrorJob) SetState(state uint32) {
 	atomic.StoreUint32(&(m.state), state)
 }
 
+func (m *mirrorJob) SetProvider(provider mirrorProvider) error {
+	s := m.State()
+	if (s != stateNone) && (s != stateDisabled) {
+		return fmt.Errorf("Provider cannot be switched when job state is %d", s)
+	}
+	m.provider = provider
+	return nil
+}
+
 // runMirrorJob is the goroutine where syncing job runs in
 // arguments:
 //    provider: mirror provider object
@@ -165,7 +174,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 			if syncErr == nil {
 				// syncing success
 				logger.Noticef("succeeded syncing %s", m.Name())
-				managerChan <- jobMessage{tunasync.Success, m.Name(), "", true}
+				managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)}
 				// post-success hooks
 				err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
 				if err != nil {
@@ -177,7 +186,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 
 			// syncing failed
 			logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
-			managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), retry == maxRetry-1}
+			managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == maxRetry-1) && (m.State() == stateReady)}
 
 			// post-fail hooks
 			logger.Debug("post-fail hooks")

+ 118 - 131
worker/provider.go

@@ -1,9 +1,10 @@
 package worker
 
 import (
-	"os"
-	"sync"
-	"sync/atomic"
+	"bytes"
+	"errors"
+	"html/template"
+	"path/filepath"
 	"time"
 )
 
@@ -54,150 +55,136 @@ type mirrorProvider interface {
 	Context() *Context
 }
 
-type baseProvider struct {
-	sync.Mutex
+// newProvider creates a mirrorProvider instance
+// using a mirrorCfg and the global cfg
+func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
 
-	ctx      *Context
-	name     string
-	interval time.Duration
-	isMaster bool
-
-	cmd       *cmdJob
-	isRunning atomic.Value
-
-	logFile *os.File
-
-	cgroup *cgroupHook
-	hooks  []jobHook
-}
-
-func (p *baseProvider) Name() string {
-	return p.name
-}
-
-func (p *baseProvider) EnterContext() *Context {
-	p.ctx = p.ctx.Enter()
-	return p.ctx
-}
-
-func (p *baseProvider) ExitContext() *Context {
-	p.ctx, _ = p.ctx.Exit()
-	return p.ctx
-}
-
-func (p *baseProvider) Context() *Context {
-	return p.ctx
-}
-
-func (p *baseProvider) Interval() time.Duration {
-	// logger.Debug("interval for %s: %v", p.Name(), p.interval)
-	return p.interval
-}
-
-func (p *baseProvider) IsMaster() bool {
-	return p.isMaster
-}
-
-func (p *baseProvider) WorkingDir() string {
-	if v, ok := p.ctx.Get(_WorkingDirKey); ok {
-		if s, ok := v.(string); ok {
-			return s
+	formatLogDir := func(logDir string, m mirrorConfig) string {
+		tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir)
+		if err != nil {
+			panic(err)
 		}
+		var formatedLogDir bytes.Buffer
+		tmpl.Execute(&formatedLogDir, m)
+		return formatedLogDir.String()
 	}
-	panic("working dir is impossible to be non-exist")
-}
 
-func (p *baseProvider) LogDir() string {
-	if v, ok := p.ctx.Get(_LogDirKey); ok {
-		if s, ok := v.(string); ok {
-			return s
-		}
+	logDir := mirror.LogDir
+	mirrorDir := mirror.MirrorDir
+	if logDir == "" {
+		logDir = cfg.Global.LogDir
 	}
-	panic("log dir is impossible to be unavailable")
-}
-
-func (p *baseProvider) LogFile() string {
-	if v, ok := p.ctx.Get(_LogFileKey); ok {
-		if s, ok := v.(string); ok {
-			return s
-		}
+	if mirrorDir == "" {
+		mirrorDir = filepath.Join(
+			cfg.Global.MirrorDir, mirror.Name,
+		)
 	}
-	panic("log dir is impossible to be unavailable")
-}
-
-func (p *baseProvider) AddHook(hook jobHook) {
-	if cg, ok := hook.(*cgroupHook); ok {
-		p.cgroup = cg
+	if mirror.Interval == 0 {
+		mirror.Interval = cfg.Global.Interval
 	}
-	p.hooks = append(p.hooks, hook)
-}
-
-func (p *baseProvider) Hooks() []jobHook {
-	return p.hooks
-}
-
-func (p *baseProvider) Cgroup() *cgroupHook {
-	return p.cgroup
-}
-
-func (p *baseProvider) prepareLogFile() error {
-	if p.LogFile() == "/dev/null" {
-		p.cmd.SetLogFile(nil)
-		return nil
+	logDir = formatLogDir(logDir, mirror)
+
+	// IsMaster
+	isMaster := true
+	if mirror.Role == "slave" {
+		isMaster = false
+	} else {
+		if mirror.Role != "" && mirror.Role != "master" {
+			logger.Warningf("Invalid role configuration for %s", mirror.Name)
+		}
 	}
-	if p.logFile == nil {
-		logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
+
+	var provider mirrorProvider
+
+	switch mirror.Provider {
+	case provCommand:
+		pc := cmdConfig{
+			name:        mirror.Name,
+			upstreamURL: mirror.Upstream,
+			command:     mirror.Command,
+			workingDir:  mirrorDir,
+			logDir:      logDir,
+			logFile:     filepath.Join(logDir, "latest.log"),
+			interval:    time.Duration(mirror.Interval) * time.Minute,
+			env:         mirror.Env,
+		}
+		p, err := newCmdProvider(pc)
+		p.isMaster = isMaster
+		if err != nil {
+			panic(err)
+		}
+		provider = p
+	case provRsync:
+		rc := rsyncConfig{
+			name:        mirror.Name,
+			upstreamURL: mirror.Upstream,
+			rsyncCmd:    mirror.Command,
+			password:    mirror.Password,
+			excludeFile: mirror.ExcludeFile,
+			workingDir:  mirrorDir,
+			logDir:      logDir,
+			logFile:     filepath.Join(logDir, "latest.log"),
+			useIPv6:     mirror.UseIPv6,
+			interval:    time.Duration(mirror.Interval) * time.Minute,
+		}
+		p, err := newRsyncProvider(rc)
+		p.isMaster = isMaster
+		if err != nil {
+			panic(err)
+		}
+		provider = p
+	case provTwoStageRsync:
+		rc := twoStageRsyncConfig{
+			name:          mirror.Name,
+			stage1Profile: mirror.Stage1Profile,
+			upstreamURL:   mirror.Upstream,
+			rsyncCmd:      mirror.Command,
+			password:      mirror.Password,
+			excludeFile:   mirror.ExcludeFile,
+			workingDir:    mirrorDir,
+			logDir:        logDir,
+			logFile:       filepath.Join(logDir, "latest.log"),
+			useIPv6:       mirror.UseIPv6,
+			interval:      time.Duration(mirror.Interval) * time.Minute,
+		}
+		p, err := newTwoStageRsyncProvider(rc)
+		p.isMaster = isMaster
 		if err != nil {
-			logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
-			return err
+			panic(err)
 		}
-		p.logFile = logFile
+		provider = p
+	default:
+		panic(errors.New("Invalid mirror provider"))
 	}
-	p.cmd.SetLogFile(p.logFile)
-	return nil
-}
-
-func (p *baseProvider) Run() error {
-	panic("Not Implemented")
-}
 
-func (p *baseProvider) Start() error {
-	panic("Not Implemented")
-}
+	// Add Logging Hook
+	provider.AddHook(newLogLimiter(provider))
 
-func (p *baseProvider) IsRunning() bool {
-	isRunning, _ := p.isRunning.Load().(bool)
-	return isRunning
-}
+	// Add Cgroup Hook
+	if cfg.Cgroup.Enable {
+		provider.AddHook(
+			newCgroupHook(provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group),
+		)
+	}
 
-func (p *baseProvider) Wait() error {
-	defer func() {
-		p.Lock()
-		p.isRunning.Store(false)
-		if p.logFile != nil {
-			p.logFile.Close()
-			p.logFile = nil
+	// ExecOnSuccess hook
+	if mirror.ExecOnSuccess != "" {
+		h, err := newExecPostHook(provider, execOnSuccess, mirror.ExecOnSuccess)
+		if err != nil {
+			logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
+			panic(err)
 		}
-		p.Unlock()
-	}()
-	return p.cmd.Wait()
-}
-
-func (p *baseProvider) Terminate() error {
-	logger.Debugf("terminating provider: %s", p.Name())
-	if !p.IsRunning() {
-		return nil
+		provider.AddHook(h)
 	}
-
-	p.Lock()
-	if p.logFile != nil {
-		p.logFile.Close()
-		p.logFile = nil
+	// ExecOnFailure hook
+	if mirror.ExecOnFailure != "" {
+		h, err := newExecPostHook(provider, execOnFailure, mirror.ExecOnFailure)
+		if err != nil {
+			logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
+			panic(err)
+		}
+		provider.AddHook(h)
 	}
-	p.Unlock()
-
-	err := p.cmd.Terminate()
-	p.isRunning.Store(false)
 
-	return err
+	return provider
 }

+ 1 - 0
worker/schedule.go

@@ -30,6 +30,7 @@ func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) {
 	q.Lock()
 	defer q.Unlock()
 	q.list.Set(schedTime, job)
+	logger.Debugf("Added job %s @ %v", job.Name(), schedTime)
 }
 
 // pop out the first job if it's time to run it

+ 96 - 148
worker/worker.go

@@ -1,12 +1,9 @@
 package worker
 
 import (
-	"bytes"
-	"errors"
 	"fmt"
-	"html/template"
 	"net/http"
-	"path/filepath"
+	"sync"
 	"time"
 
 	"github.com/gin-gonic/gin"
@@ -17,9 +14,9 @@ var tunasyncWorker *Worker
 
 // A Worker is a instance of tunasync worker
 type Worker struct {
-	cfg       *Config
-	providers map[string]mirrorProvider
-	jobs      map[string]*mirrorJob
+	L    sync.Mutex
+	cfg  *Config
+	jobs map[string]*mirrorJob
 
 	managerChan chan jobMessage
 	semaphore   chan empty
@@ -36,9 +33,8 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
 	}
 
 	w := &Worker{
-		cfg:       cfg,
-		providers: make(map[string]mirrorProvider),
-		jobs:      make(map[string]*mirrorJob),
+		cfg:  cfg,
+		jobs: make(map[string]*mirrorJob),
 
 		managerChan: make(chan jobMessage, 32),
 		semaphore:   make(chan empty, cfg.Global.Concurrent),
@@ -61,147 +57,89 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
 	return w
 }
 
-func (w *Worker) initProviders() {
-	c := w.cfg
-
-	formatLogDir := func(logDir string, m mirrorConfig) string {
-		tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir)
-		if err != nil {
-			panic(err)
-		}
-		var formatedLogDir bytes.Buffer
-		tmpl.Execute(&formatedLogDir, m)
-		return formatedLogDir.String()
+func (w *Worker) initJobs() {
+	for _, mirror := range w.cfg.Mirrors {
+		// Create Provider
+		provider := newMirrorProvider(mirror, w.cfg)
+		w.jobs[provider.Name()] = newMirrorJob(provider)
 	}
+}
 
-	for _, mirror := range c.Mirrors {
-		logDir := mirror.LogDir
-		mirrorDir := mirror.MirrorDir
-		if logDir == "" {
-			logDir = c.Global.LogDir
-		}
-		if mirrorDir == "" {
-			mirrorDir = filepath.Join(
-				c.Global.MirrorDir, mirror.Name,
-			)
+// ReloadMirrorConfig refresh the providers and jobs
+// from new mirror configs
+// TODO: deleted job should be removed from manager-side mirror list
+func (w *Worker) ReloadMirrorConfig(newMirrors []mirrorConfig) {
+	w.L.Lock()
+	defer w.L.Unlock()
+	logger.Info("Reloading mirror configs")
+
+	oldMirrors := w.cfg.Mirrors
+	difference := diffMirrorConfig(oldMirrors, newMirrors)
+
+	// first deal with deletion and modifications
+	for _, op := range difference {
+		if op.diffOp == diffAdd {
+			continue
 		}
-		if mirror.Interval == 0 {
-			mirror.Interval = c.Global.Interval
+		name := op.mirCfg.Name
+		job, ok := w.jobs[name]
+		if !ok {
+			logger.Warningf("Job %s not found", name)
+			continue
 		}
-		logDir = formatLogDir(logDir, mirror)
-
-		// IsMaster
-		isMaster := true
-		if mirror.Role == "slave" {
-			isMaster = false
-		} else {
-			if mirror.Role != "" && mirror.Role != "master" {
-				logger.Warningf("Invalid role configuration for %s", mirror.Name)
+		switch op.diffOp {
+		case diffDelete:
+			w.disableJob(job)
+			delete(w.jobs, name)
+			logger.Noticef("Deleted job %s", name)
+		case diffModify:
+			jobState := job.State()
+			w.disableJob(job)
+			// set new provider
+			provider := newMirrorProvider(op.mirCfg, w.cfg)
+			if err := job.SetProvider(provider); err != nil {
+				logger.Errorf("Error setting job provider of %s: %s", name, err.Error())
+				continue
 			}
-		}
 
-		var provider mirrorProvider
-
-		switch mirror.Provider {
-		case ProvCommand:
-			pc := cmdConfig{
-				name:        mirror.Name,
-				upstreamURL: mirror.Upstream,
-				command:     mirror.Command,
-				workingDir:  mirrorDir,
-				logDir:      logDir,
-				logFile:     filepath.Join(logDir, "latest.log"),
-				interval:    time.Duration(mirror.Interval) * time.Minute,
-				env:         mirror.Env,
-			}
-			p, err := newCmdProvider(pc)
-			p.isMaster = isMaster
-			if err != nil {
-				panic(err)
-			}
-			provider = p
-		case ProvRsync:
-			rc := rsyncConfig{
-				name:        mirror.Name,
-				upstreamURL: mirror.Upstream,
-				rsyncCmd:    mirror.Command,
-				password:    mirror.Password,
-				excludeFile: mirror.ExcludeFile,
-				workingDir:  mirrorDir,
-				logDir:      logDir,
-				logFile:     filepath.Join(logDir, "latest.log"),
-				useIPv6:     mirror.UseIPv6,
-				interval:    time.Duration(mirror.Interval) * time.Minute,
-			}
-			p, err := newRsyncProvider(rc)
-			p.isMaster = isMaster
-			if err != nil {
-				panic(err)
-			}
-			provider = p
-		case ProvTwoStageRsync:
-			rc := twoStageRsyncConfig{
-				name:          mirror.Name,
-				stage1Profile: mirror.Stage1Profile,
-				upstreamURL:   mirror.Upstream,
-				rsyncCmd:      mirror.Command,
-				password:      mirror.Password,
-				excludeFile:   mirror.ExcludeFile,
-				workingDir:    mirrorDir,
-				logDir:        logDir,
-				logFile:       filepath.Join(logDir, "latest.log"),
-				useIPv6:       mirror.UseIPv6,
-				interval:      time.Duration(mirror.Interval) * time.Minute,
-			}
-			p, err := newTwoStageRsyncProvider(rc)
-			p.isMaster = isMaster
-			if err != nil {
-				panic(err)
+			// re-schedule job according to its previous state
+			if jobState == stateDisabled {
+				job.SetState(stateDisabled)
+			} else if jobState == statePaused {
+				job.SetState(statePaused)
+				go job.Run(w.managerChan, w.semaphore)
+			} else {
+				job.SetState(stateReady)
+				go job.Run(w.managerChan, w.semaphore)
+				w.schedule.AddJob(time.Now(), job)
 			}
-			provider = p
-		default:
-			panic(errors.New("Invalid mirror provider"))
-
+			logger.Noticef("Reloaded job %s", name)
 		}
-
-		provider.AddHook(newLogLimiter(provider))
-
-		// Add Cgroup Hook
-		if w.cfg.Cgroup.Enable {
-			provider.AddHook(
-				newCgroupHook(provider, w.cfg.Cgroup.BasePath, w.cfg.Cgroup.Group),
-			)
+	}
+	// for added new jobs, just start new jobs
+	for _, op := range difference {
+		if op.diffOp != diffAdd {
+			continue
 		}
+		provider := newMirrorProvider(op.mirCfg, w.cfg)
+		job := newMirrorJob(provider)
+		w.jobs[provider.Name()] = job
 
-		// ExecOnSuccess hook
-		if mirror.ExecOnSuccess != "" {
-			h, err := newExecPostHook(provider, execOnSuccess, mirror.ExecOnSuccess)
-			if err != nil {
-				logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
-				panic(err)
-			}
-			provider.AddHook(h)
-		}
-		// ExecOnFailure hook
-		if mirror.ExecOnFailure != "" {
-			h, err := newExecPostHook(provider, execOnFailure, mirror.ExecOnFailure)
-			if err != nil {
-				logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
-				panic(err)
-			}
-			provider.AddHook(h)
-		}
+		job.SetState(stateReady)
+		go job.Run(w.managerChan, w.semaphore)
+		w.schedule.AddJob(time.Now(), job)
+		logger.Noticef("New job %s", job.Name())
+	}
 
-		w.providers[provider.Name()] = provider
+	w.cfg.Mirrors = newMirrors
 
-	}
 }
 
-func (w *Worker) initJobs() {
-	w.initProviders()
-
-	for name, provider := range w.providers {
-		w.jobs[name] = newMirrorJob(provider)
+func (w *Worker) disableJob(job *mirrorJob) {
+	w.schedule.Remove(job.Name())
+	if job.State() != stateDisabled {
+		job.ctrlChan <- jobDisable
+		<-job.disabled
 	}
 }
 
@@ -211,17 +149,22 @@ func (w *Worker) makeHTTPServer() {
 	s.Use(gin.Recovery())
 
 	s.POST("/", func(c *gin.Context) {
+		w.L.Lock()
+		defer w.L.Unlock()
+
 		var cmd WorkerCmd
 
 		if err := c.BindJSON(&cmd); err != nil {
 			c.JSON(http.StatusBadRequest, gin.H{"msg": "Invalid request"})
 			return
 		}
+
 		job, ok := w.jobs[cmd.MirrorID]
 		if !ok {
 			c.JSON(http.StatusNotFound, gin.H{"msg": fmt.Sprintf("Mirror ``%s'' not found", cmd.MirrorID)})
 			return
 		}
+
 		logger.Noticef("Received command: %v", cmd)
 		// if job disabled, start them first
 		switch cmd.Cmd {
@@ -243,11 +186,7 @@ func (w *Worker) makeHTTPServer() {
 				job.ctrlChan <- jobStop
 			}
 		case CmdDisable:
-			w.schedule.Remove(job.Name())
-			if job.State() != stateDisabled {
-				job.ctrlChan <- jobDisable
-				<-job.disabled
-			}
+			w.disableJob(job)
 		case CmdPing:
 			job.ctrlChan <- jobStart
 		default:
@@ -289,6 +228,8 @@ func (w *Worker) Run() {
 }
 
 func (w *Worker) runSchedule() {
+	w.L.Lock()
+
 	mirrorList := w.fetchJobStatus()
 	unset := make(map[string]bool)
 	for name := range w.jobs {
@@ -327,11 +268,20 @@ func (w *Worker) runSchedule() {
 		w.schedule.AddJob(time.Now(), job)
 	}
 
+	w.L.Unlock()
+
 	for {
 		select {
 		case jobMsg := <-w.managerChan:
 			// got status update from job
-			job := w.jobs[jobMsg.name]
+			w.L.Lock()
+			job, ok := w.jobs[jobMsg.name]
+			w.L.Unlock()
+			if !ok {
+				logger.Warningf("Job %s not found", jobMsg.name)
+				continue
+			}
+
 			if job.State() != stateReady {
 				logger.Infof("Job %s state is not ready, skip adding new schedule", jobMsg.name)
 				continue
@@ -341,7 +291,7 @@ func (w *Worker) runSchedule() {
 			// is running. If it's paused or disabled
 			// a sync failure signal would be emitted
 			// which needs to be ignored
-			w.updateStatus(jobMsg)
+			w.updateStatus(job, jobMsg)
 
 			// only successful or the final failure msg
 			// can trigger scheduling
@@ -361,9 +311,7 @@ func (w *Worker) runSchedule() {
 				job.ctrlChan <- jobStart
 			}
 		}
-
 	}
-
 }
 
 // Name returns worker name
@@ -397,14 +345,14 @@ func (w *Worker) registorWorker() {
 	}
 }
 
-func (w *Worker) updateStatus(jobMsg jobMessage) {
+func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
 	url := fmt.Sprintf(
 		"%s/workers/%s/jobs/%s",
 		w.cfg.Manager.APIBase,
 		w.Name(),
 		jobMsg.name,
 	)
-	p := w.providers[jobMsg.name]
+	p := job.provider
 	smsg := MirrorStatus{
 		Name:     jobMsg.name,
 		Worker:   w.cfg.Global.Name,