7IN0SAN9 8 jaren geleden
bovenliggende
commit
563860d424

+ 5 - 0
worker/base_provider.go

@@ -15,6 +15,7 @@ type baseProvider struct {
 	ctx      *Context
 	name     string
 	interval time.Duration
+	retry    int
 	isMaster bool
 
 	cmd       *cmdJob
@@ -52,6 +53,10 @@ func (p *baseProvider) Interval() time.Duration {
 	return p.interval
 }
 
+func (p *baseProvider) Retry() int {
+	return p.retry
+}
+
 func (p *baseProvider) IsMaster() bool {
 	return p.isMaster
 }

+ 2 - 0
worker/cmd_provider.go

@@ -11,6 +11,7 @@ type cmdConfig struct {
 	upstreamURL, command        string
 	workingDir, logDir, logFile string
 	interval                    time.Duration
+	retry                       int
 	env                         map[string]string
 }
 
@@ -27,6 +28,7 @@ func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
 			name:     c.name,
 			ctx:      NewContext(),
 			interval: c.interval,
+			retry:    c.retry,
 		},
 		cmdConfig: c,
 	}

+ 1 - 1
worker/common.go

@@ -8,6 +8,6 @@ import (
 
 type empty struct{}
 
-const maxRetry = 2
+const defaultMaxRetry = 2
 
 var logger = logging.MustGetLogger("tunasync")

+ 2 - 0
worker/config.go

@@ -49,6 +49,7 @@ type globalConfig struct {
 	MirrorDir  string `toml:"mirror_dir"`
 	Concurrent int    `toml:"concurrent"`
 	Interval   int    `toml:"interval"`
+	Retry      int    `toml:"retry"`
 
 	ExecOnSuccess []string `toml:"exec_on_success"`
 	ExecOnFailure []string `toml:"exec_on_failure"`
@@ -108,6 +109,7 @@ type mirrorConfig struct {
 	Provider  providerEnum      `toml:"provider"`
 	Upstream  string            `toml:"upstream"`
 	Interval  int               `toml:"interval"`
+	Retry     int               `toml:"retry"`
 	MirrorDir string            `toml:"mirror_dir"`
 	LogDir    string            `toml:"log_dir"`
 	Env       map[string]string `toml:"env"`

+ 4 - 0
worker/config_test.go

@@ -18,6 +18,7 @@ log_dir = "/var/log/tunasync/{{.Name}}"
 mirror_dir = "/data/mirrors"
 concurrent = 10
 interval = 240
+retry = 3
 
 [manager]
 api_base = "https://127.0.0.1:5000"
@@ -35,6 +36,7 @@ name = "AOSP"
 provider = "command"
 upstream = "https://aosp.google.com/"
 interval = 720
+retry = 2
 mirror_dir = "/data/git/AOSP"
 exec_on_success = [
 	"bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKING_DIR}/exit_status'"
@@ -116,6 +118,7 @@ use_ipv6 = true
 		So(err, ShouldBeNil)
 		So(cfg.Global.Name, ShouldEqual, "test_worker")
 		So(cfg.Global.Interval, ShouldEqual, 240)
+		So(cfg.Global.Retry, ShouldEqual, 3)
 		So(cfg.Global.MirrorDir, ShouldEqual, "/data/mirrors")
 
 		So(cfg.Manager.APIBase, ShouldEqual, "https://127.0.0.1:5000")
@@ -126,6 +129,7 @@ use_ipv6 = true
 		So(m.MirrorDir, ShouldEqual, "/data/git/AOSP")
 		So(m.Provider, ShouldEqual, provCommand)
 		So(m.Interval, ShouldEqual, 720)
+		So(m.Retry, ShouldEqual, 2)
 		So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo")
 
 		m = cfg.Mirrors[1]

+ 1 - 1
worker/exec_post_test.go

@@ -92,7 +92,7 @@ exit 1
 			job.ctrlChan <- jobStart
 			msg := <-managerChan
 			So(msg.status, ShouldEqual, PreSyncing)
-			for i := 0; i < maxRetry; i++ {
+			for i := 0; i < defaultMaxRetry; i++ {
 				msg = <-managerChan
 				So(msg.status, ShouldEqual, Syncing)
 				msg = <-managerChan

+ 2 - 2
worker/job.go

@@ -136,7 +136,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 			return err
 		}
 
-		for retry := 0; retry < maxRetry; retry++ {
+		for retry := 0; retry < provider.Retry(); retry++ {
 			stopASAP := false // stop job as soon as possible
 
 			if retry > 0 {
@@ -194,7 +194,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 
 			// syncing failed
 			logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
-			managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == maxRetry-1) && (m.State() == stateReady)}
+			managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == provider.Retry()-1) && (m.State() == stateReady)}
 
 			// post-fail hooks
 			logger.Debug("post-fail hooks")

+ 7 - 0
worker/provider.go

@@ -45,6 +45,7 @@ type mirrorProvider interface {
 	Hooks() []jobHook
 
 	Interval() time.Duration
+	Retry() int
 
 	WorkingDir() string
 	LogDir() string
@@ -86,6 +87,9 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
 	if mirror.Interval == 0 {
 		mirror.Interval = cfg.Global.Interval
 	}
+	if mirror.Retry == 0 {
+		mirror.Retry = cfg.Global.Retry
+	}
 	logDir = formatLogDir(logDir, mirror)
 
 	// IsMaster
@@ -110,6 +114,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
 			logDir:      logDir,
 			logFile:     filepath.Join(logDir, "latest.log"),
 			interval:    time.Duration(mirror.Interval) * time.Minute,
+			retry:       mirror.Retry,
 			env:         mirror.Env,
 		}
 		p, err := newCmdProvider(pc)
@@ -131,6 +136,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
 			logFile:     filepath.Join(logDir, "latest.log"),
 			useIPv6:     mirror.UseIPv6,
 			interval:    time.Duration(mirror.Interval) * time.Minute,
+			retry:       mirror.Retry,
 		}
 		p, err := newRsyncProvider(rc)
 		p.isMaster = isMaster
@@ -152,6 +158,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
 			logFile:       filepath.Join(logDir, "latest.log"),
 			useIPv6:       mirror.UseIPv6,
 			interval:      time.Duration(mirror.Interval) * time.Minute,
+			retry:         mirror.Retry,
 		}
 		p, err := newTwoStageRsyncProvider(rc)
 		p.isMaster = isMaster

+ 2 - 0
worker/rsync_provider.go

@@ -13,6 +13,7 @@ type rsyncConfig struct {
 	workingDir, logDir, logFile                  string
 	useIPv6                                      bool
 	interval                                     time.Duration
+	retry                                        int
 }
 
 // An RsyncProvider provides the implementation to rsync-based syncing jobs
@@ -32,6 +33,7 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
 			name:     c.name,
 			ctx:      NewContext(),
 			interval: c.interval,
+			retry:    c.retry,
 		},
 		rsyncConfig: c,
 	}

+ 2 - 0
worker/two_stage_rsync_provider.go

@@ -15,6 +15,7 @@ type twoStageRsyncConfig struct {
 	workingDir, logDir, logFile                  string
 	useIPv6                                      bool
 	interval                                     time.Duration
+	retry                                        int
 }
 
 // An RsyncProvider provides the implementation to rsync-based syncing jobs
@@ -44,6 +45,7 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er
 			name:     c.name,
 			ctx:      NewContext(),
 			interval: c.interval,
+			retry:    c.retry,
 		},
 		twoStageRsyncConfig: c,
 		stage1Options: []string{

+ 4 - 0
worker/worker.go

@@ -35,6 +35,10 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
 		return tunasyncWorker
 	}
 
+	if cfg.Global.Retry == 0 {
+		cfg.Global.Retry = defaultMaxRetry
+	}
+
 	w := &Worker{
 		cfg:  cfg,
 		jobs: make(map[string]*mirrorJob),