浏览代码

feature(API): error message in manager channel

bigeagle 9 年之前
父节点
当前提交
f31bcfbcc3
共有 3 个文件被更改,包括 56 次插入12 次删除
  1. 1 0
      internal/msg.go
  2. 22 5
      worker/job.go
  3. 33 7
      worker/job_test.go

+ 1 - 0
internal/msg.go

@@ -12,6 +12,7 @@ type StatusUpdateMsg struct {
 	LastUpdate time.Time  `json:"last_update"`
 	Upstream   string     `json:"upstream"`
 	Size       string     `json:"size"`
+	ErrorMsg   string     `json:"error_msg"`
 }
 
 // A WorkerInfoMsg is

+ 22 - 5
worker/job.go

@@ -1,6 +1,11 @@
 package worker
 
-import "errors"
+import (
+	"errors"
+	"fmt"
+
+	tunasync "github.com/tuna/tunasync/internal"
+)
 
 // this file contains the workflow of a mirror jb
 
@@ -14,14 +19,20 @@ const (
 	jobPing               // ensure the goroutine is alive
 )
 
+type jobMessage struct {
+	status tunasync.SyncStatus
+	name   string
+	msg    string
+}
+
 // runMirrorJob is the goroutine where syncing job runs in
 // arguments:
 //    provider: mirror provider object
 //    ctrlChan: receives messages from the manager
-//    managerChan: push messages to the manager
+//    managerChan: push messages to the manager, this channel should have a larger buffer
 //    sempaphore: make sure the concurrent running syncing job won't explode
 // TODO: message struct for managerChan
-func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerChan chan<- struct{}, semaphore chan empty) error {
+func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerChan chan<- jobMessage, semaphore chan empty) error {
 
 	// to make code shorter
 	runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
@@ -31,6 +42,10 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 					"failed at %s hooks for %s: %s",
 					hookname, provider.Name(), err.Error(),
 				)
+				managerChan <- jobMessage{
+					tunasync.Failed, provider.Name(),
+					fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
+				}
 				return err
 			}
 		}
@@ -40,6 +55,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 	runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
 		defer close(jobDone)
 
+		managerChan <- jobMessage{tunasync.PreSyncing, provider.Name(), ""}
 		logger.Info("start syncing: %s", provider.Name())
 
 		Hooks := provider.Hooks()
@@ -66,6 +82,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 			}
 
 			// start syncing
+			managerChan <- jobMessage{tunasync.Syncing, provider.Name(), ""}
 			err = provider.Start()
 			if err != nil {
 				logger.Error(
@@ -106,7 +123,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 			if syncErr == nil {
 				// syncing success
 				logger.Info("succeeded syncing %s", provider.Name())
-				managerChan <- struct{}{}
+				managerChan <- jobMessage{tunasync.Success, provider.Name(), ""}
 				// post-success hooks
 				err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
 				if err != nil {
@@ -118,7 +135,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 
 			// syncing failed
 			logger.Warning("failed syncing %s: %s", provider.Name(), syncErr.Error())
-			managerChan <- struct{}{}
+			managerChan <- jobMessage{tunasync.Failed, provider.Name(), syncErr.Error()}
 
 			// post-fail hooks
 			logger.Debug("post-fail hooks")

+ 33 - 7
worker/job_test.go

@@ -64,23 +64,34 @@ func TestMirrorJob(t *testing.T) {
 
 			Convey("If we let it run several times", func(ctx C) {
 				ctrlChan := make(chan ctrlAction)
-				managerChan := make(chan struct{})
+				managerChan := make(chan jobMessage, 10)
 				semaphore := make(chan empty, 1)
 
 				go runMirrorJob(provider, ctrlChan, managerChan, semaphore)
 				for i := 0; i < 2; i++ {
-					<-managerChan
+					msg := <-managerChan
+					So(msg.status, ShouldEqual, PreSyncing)
+					msg = <-managerChan
+					So(msg.status, ShouldEqual, Syncing)
+					msg = <-managerChan
+					So(msg.status, ShouldEqual, Success)
 					loggedContent, err := ioutil.ReadFile(provider.LogFile())
 					So(err, ShouldBeNil)
 					So(string(loggedContent), ShouldEqual, exceptedOutput)
 					ctrlChan <- jobStart
 				}
 				select {
-				case <-managerChan:
-					So(0, ShouldEqual, 0) // made this fail
+				case msg := <-managerChan:
+					So(msg.status, ShouldEqual, PreSyncing)
+					msg = <-managerChan
+					So(msg.status, ShouldEqual, Syncing)
+					msg = <-managerChan
+					So(msg.status, ShouldEqual, Success)
+
 				case <-time.After(2 * time.Second):
 					So(0, ShouldEqual, 1)
 				}
+
 				ctrlChan <- jobDisable
 				select {
 				case <-managerChan:
@@ -102,23 +113,38 @@ echo $TUNASYNC_WORKING_DIR
 			So(err, ShouldBeNil)
 
 			ctrlChan := make(chan ctrlAction)
-			managerChan := make(chan struct{})
+			managerChan := make(chan jobMessage, 10)
 			semaphore := make(chan empty, 1)
 
 			Convey("If we kill it", func(ctx C) {
 				go runMirrorJob(provider, ctrlChan, managerChan, semaphore)
+
 				time.Sleep(1 * time.Second)
+				msg := <-managerChan
+				So(msg.status, ShouldEqual, PreSyncing)
+				msg = <-managerChan
+				So(msg.status, ShouldEqual, Syncing)
+
 				ctrlChan <- jobStop
-				<-managerChan
+
+				msg = <-managerChan
+				So(msg.status, ShouldEqual, Failed)
+
 				exceptedOutput := fmt.Sprintf("%s\n", provider.WorkingDir())
 				loggedContent, err := ioutil.ReadFile(provider.LogFile())
 				So(err, ShouldBeNil)
 				So(string(loggedContent), ShouldEqual, exceptedOutput)
 				ctrlChan <- jobDisable
 			})
+
 			Convey("If we don't kill it", func(ctx C) {
 				go runMirrorJob(provider, ctrlChan, managerChan, semaphore)
-				<-managerChan
+				msg := <-managerChan
+				So(msg.status, ShouldEqual, PreSyncing)
+				msg = <-managerChan
+				So(msg.status, ShouldEqual, Syncing)
+				msg = <-managerChan
+				So(msg.status, ShouldEqual, Success)
 
 				exceptedOutput := fmt.Sprintf(
 					"%s\n%s\n",