Browse Source

feature(worker): runMirrorJob no longer controls the interval

bigeagle 9 years ago
parent
commit
6b05a5894e
3 changed files with 32 additions and 28 deletions
  1. 2 2
      internal/logger.go
  2. 25 25
      worker/job.go
  3. 5 1
      worker/job_test.go

+ 2 - 2
internal/logger.go

@@ -10,9 +10,9 @@ import (
 func InitLogger(verbose, debug, withSystemd bool) {
 func InitLogger(verbose, debug, withSystemd bool) {
 	var fmtString string
 	var fmtString string
 	if withSystemd {
 	if withSystemd {
-		fmtString = "\r[%{level:.6s}] %{message}"
+		fmtString = "[%{level:.6s}] %{message}"
 	} else {
 	} else {
-		fmtString = "\r%{color}[%{time:06-01-02 15:04:05}][%{level:.6s}]%{color:reset} %{message}"
+		fmtString = "%{color}[%{time:06-01-02 15:04:05}][%{level:.6s}][%{shortfile}]%{color:reset} %{message}"
 	}
 	}
 	format := logging.MustStringFormatter(fmtString)
 	format := logging.MustStringFormatter(fmtString)
 	logging.SetFormatter(format)
 	logging.SetFormatter(format)

+ 25 - 25
worker/job.go

@@ -1,9 +1,6 @@
 package worker
 package worker
 
 
-import (
-	"errors"
-	"time"
-)
+import "errors"
 
 
 // this file contains the workflow of a mirror jb
 // this file contains the workflow of a mirror jb
 
 
@@ -41,7 +38,8 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 	}
 	}
 
 
 	runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
 	runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
-		defer func() { jobDone <- empty{} }()
+		defer close(jobDone)
+
 		logger.Info("start syncing: %s", provider.Name())
 		logger.Info("start syncing: %s", provider.Name())
 
 
 		Hooks := provider.Hooks()
 		Hooks := provider.Hooks()
@@ -89,6 +87,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 			case syncErr = <-syncDone:
 			case syncErr = <-syncDone:
 				logger.Debug("syncing done")
 				logger.Debug("syncing done")
 			case <-kill:
 			case <-kill:
+				logger.Debug("received kill")
 				stopASAP = true
 				stopASAP = true
 				err := provider.Terminate()
 				err := provider.Terminate()
 				if err != nil {
 				if err != nil {
@@ -118,15 +117,18 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 			}
 			}
 
 
 			// syncing failed
 			// syncing failed
-			logger.Info("failed syncing %s: %s", provider.Name(), err.Error())
+			logger.Warning("failed syncing %s: %s", provider.Name(), syncErr.Error())
 			managerChan <- struct{}{}
 			managerChan <- struct{}{}
+
 			// post-fail hooks
 			// post-fail hooks
+			logger.Debug("post-fail hooks")
 			err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
 			err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
 			// gracefully exit
 			// gracefully exit
 			if stopASAP {
 			if stopASAP {
+				logger.Debug("No retry, exit directly")
 				return nil
 				return nil
 			}
 			}
 			// continue to next retry
 			// continue to next retry
@@ -140,6 +142,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 			defer func() { semaphore <- empty{} }()
 			defer func() { semaphore <- empty{} }()
 			runJobWrapper(kill, jobDone)
 			runJobWrapper(kill, jobDone)
 		case <-kill:
 		case <-kill:
+			jobDone <- empty{}
 			return
 			return
 		}
 		}
 	}
 	}
@@ -160,12 +163,15 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 				case jobStop:
 				case jobStop:
 					enabled = false
 					enabled = false
 					close(kill)
 					close(kill)
+					<-jobDone
 				case jobDisable:
 				case jobDisable:
 					close(kill)
 					close(kill)
+					<-jobDone
 					return nil
 					return nil
 				case jobRestart:
 				case jobRestart:
 					enabled = true
 					enabled = true
 					close(kill)
 					close(kill)
+					<-jobDone
 					continue
 					continue
 				case jobStart:
 				case jobStart:
 					enabled = true
 					enabled = true
@@ -178,25 +184,19 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 			}
 			}
 		}
 		}
 
 
-		select {
-		case <-time.After(provider.Interval()):
-			continue
-		case ctrl := <-ctrlChan:
-			switch ctrl {
-			case jobStop:
-				enabled = false
-			case jobDisable:
-				return nil
-			case jobRestart:
-				enabled = true
-			case jobStart:
-				enabled = true
-			default:
-				// TODO
-				return nil
-			}
+		ctrl := <-ctrlChan
+		switch ctrl {
+		case jobStop:
+			enabled = false
+		case jobDisable:
+			return nil
+		case jobRestart:
+			enabled = true
+		case jobStart:
+			enabled = true
+		default:
+			// TODO
+			return nil
 		}
 		}
 	}
 	}
-
-	return nil
 }
 }

+ 5 - 1
worker/job_test.go

@@ -9,10 +9,13 @@ import (
 	"time"
 	"time"
 
 
 	. "github.com/smartystreets/goconvey/convey"
 	. "github.com/smartystreets/goconvey/convey"
+	. "github.com/tuna/tunasync/internal"
 )
 )
 
 
 func TestMirrorJob(t *testing.T) {
 func TestMirrorJob(t *testing.T) {
 
 
+	InitLogger(true, true, false)
+
 	Convey("MirrorJob should work", t, func(ctx C) {
 	Convey("MirrorJob should work", t, func(ctx C) {
 		tmpDir, err := ioutil.TempDir("", "tunasync")
 		tmpDir, err := ioutil.TempDir("", "tunasync")
 		defer os.RemoveAll(tmpDir)
 		defer os.RemoveAll(tmpDir)
@@ -71,6 +74,7 @@ func TestMirrorJob(t *testing.T) {
 					loggedContent, err := ioutil.ReadFile(provider.LogFile())
 					loggedContent, err := ioutil.ReadFile(provider.LogFile())
 					So(err, ShouldBeNil)
 					So(err, ShouldBeNil)
 					So(string(loggedContent), ShouldEqual, exceptedOutput)
 					So(string(loggedContent), ShouldEqual, exceptedOutput)
+					ctrlChan <- jobStart
 				}
 				}
 				select {
 				select {
 				case <-managerChan:
 				case <-managerChan:
@@ -107,7 +111,7 @@ echo $TUNASYNC_WORKING_DIR
 				go runMirrorJob(provider, ctrlChan, managerChan, semaphore)
 				go runMirrorJob(provider, ctrlChan, managerChan, semaphore)
 				time.Sleep(1 * time.Second)
 				time.Sleep(1 * time.Second)
 				ctrlChan <- jobStop
 				ctrlChan <- jobStop
-				time.Sleep(1 * time.Second)
+				<-managerChan
 				exceptedOutput := fmt.Sprintf("%s\n", provider.WorkingDir())
 				exceptedOutput := fmt.Sprintf("%s\n", provider.WorkingDir())
 				loggedContent, err := ioutil.ReadFile(provider.LogFile())
 				loggedContent, err := ioutil.ReadFile(provider.LogFile())
 				So(err, ShouldBeNil)
 				So(err, ShouldBeNil)