Sfoglia il codice sorgente

feature(worker): job schedule

bigeagle 9 anni fa
parent
commit
b077db1d0b
5 ha cambiato i file con 191 aggiunte e 33 eliminazioni
  1. 42 23
      worker/job.go
  2. 11 10
      worker/job_test.go
  3. 17 0
      worker/main.go
  4. 71 0
      worker/schedule.go
  5. 50 0
      worker/schedule_test.go

+ 42 - 23
worker/job.go

@@ -25,6 +25,24 @@ type jobMessage struct {
 	msg    string
 }
 
+type mirrorJob struct {
+	provider mirrorProvider
+	ctrlChan chan ctrlAction
+	enabled  bool
+}
+
+func newMirrorJob(provider mirrorProvider) *mirrorJob {
+	return &mirrorJob{
+		provider: provider,
+		ctrlChan: make(chan ctrlAction, 1),
+		enabled:  true,
+	}
+}
+
+func (m *mirrorJob) Name() string {
+	return m.provider.Name()
+}
+
 // runMirrorJob is the goroutine where syncing job runs in
 // arguments:
 //    provider: mirror provider object
@@ -32,7 +50,9 @@ type jobMessage struct {
 //    managerChan: push messages to the manager, this channel should have a larger buffer
 //    sempaphore: make sure the concurrent running syncing job won't explode
 // TODO: message struct for managerChan
-func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerChan chan<- jobMessage, semaphore chan empty) error {
+func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
+
+	provider := m.provider
 
 	// to make code shorter
 	runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
@@ -40,10 +60,10 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 			if err := action(hook); err != nil {
 				logger.Error(
 					"failed at %s hooks for %s: %s",
-					hookname, provider.Name(), err.Error(),
+					hookname, m.Name(), err.Error(),
 				)
 				managerChan <- jobMessage{
-					tunasync.Failed, provider.Name(),
+					tunasync.Failed, m.Name(),
 					fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
 				}
 				return err
@@ -55,8 +75,8 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 	runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
 		defer close(jobDone)
 
-		managerChan <- jobMessage{tunasync.PreSyncing, provider.Name(), ""}
-		logger.Info("start syncing: %s", provider.Name())
+		managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), ""}
+		logger.Info("start syncing: %s", m.Name())
 
 		Hooks := provider.Hooks()
 		rHooks := []jobHook{}
@@ -74,7 +94,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 			stopASAP := false // stop job as soon as possible
 
 			if retry > 0 {
-				logger.Info("retry syncing: %s, retry: %d", provider.Name(), retry)
+				logger.Info("retry syncing: %s, retry: %d", m.Name(), retry)
 			}
 			err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
 			if err != nil {
@@ -82,12 +102,12 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 			}
 
 			// start syncing
-			managerChan <- jobMessage{tunasync.Syncing, provider.Name(), ""}
+			managerChan <- jobMessage{tunasync.Syncing, m.Name(), ""}
 			err = provider.Start()
 			if err != nil {
 				logger.Error(
 					"failed to start syncing job for %s: %s",
-					provider.Name(), err.Error(),
+					m.Name(), err.Error(),
 				)
 				return err
 			}
@@ -108,7 +128,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 				stopASAP = true
 				err := provider.Terminate()
 				if err != nil {
-					logger.Error("failed to terminate provider %s: %s", provider.Name(), err.Error())
+					logger.Error("failed to terminate provider %s: %s", m.Name(), err.Error())
 					return err
 				}
 				syncErr = errors.New("killed by manager")
@@ -122,8 +142,8 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 
 			if syncErr == nil {
 				// syncing success
-				logger.Info("succeeded syncing %s", provider.Name())
-				managerChan <- jobMessage{tunasync.Success, provider.Name(), ""}
+				logger.Info("succeeded syncing %s", m.Name())
+				managerChan <- jobMessage{tunasync.Success, m.Name(), ""}
 				// post-success hooks
 				err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
 				if err != nil {
@@ -134,8 +154,8 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 			}
 
 			// syncing failed
-			logger.Warning("failed syncing %s: %s", provider.Name(), syncErr.Error())
-			managerChan <- jobMessage{tunasync.Failed, provider.Name(), syncErr.Error()}
+			logger.Warning("failed syncing %s: %s", m.Name(), syncErr.Error())
+			managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error()}
 
 			// post-fail hooks
 			logger.Debug("post-fail hooks")
@@ -164,9 +184,8 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 		}
 	}
 
-	enabled := true // whether this job is stopped by the manager
 	for {
-		if enabled {
+		if m.enabled {
 			kill := make(chan empty)
 			jobDone := make(chan empty)
 			go runJob(kill, jobDone)
@@ -175,10 +194,10 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 			select {
 			case <-jobDone:
 				logger.Debug("job done")
-			case ctrl := <-ctrlChan:
+			case ctrl := <-m.ctrlChan:
 				switch ctrl {
 				case jobStop:
-					enabled = false
+					m.enabled = false
 					close(kill)
 					<-jobDone
 				case jobDisable:
@@ -186,12 +205,12 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 					<-jobDone
 					return nil
 				case jobRestart:
-					enabled = true
+					m.enabled = true
 					close(kill)
 					<-jobDone
 					continue
 				case jobStart:
-					enabled = true
+					m.enabled = true
 					goto _wait_for_job
 				default:
 					// TODO: implement this
@@ -201,16 +220,16 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh
 			}
 		}
 
-		ctrl := <-ctrlChan
+		ctrl := <-m.ctrlChan
 		switch ctrl {
 		case jobStop:
-			enabled = false
+			m.enabled = false
 		case jobDisable:
 			return nil
 		case jobRestart:
-			enabled = true
+			m.enabled = true
 		case jobStart:
-			enabled = true
+			m.enabled = true
 		default:
 			// TODO
 			return nil

+ 11 - 10
worker/job_test.go

@@ -63,11 +63,11 @@ func TestMirrorJob(t *testing.T) {
 			So(readedScriptContent, ShouldResemble, []byte(scriptContent))
 
 			Convey("If we let it run several times", func(ctx C) {
-				ctrlChan := make(chan ctrlAction)
 				managerChan := make(chan jobMessage, 10)
 				semaphore := make(chan empty, 1)
+				job := newMirrorJob(provider)
 
-				go runMirrorJob(provider, ctrlChan, managerChan, semaphore)
+				go job.Run(managerChan, semaphore)
 				for i := 0; i < 2; i++ {
 					msg := <-managerChan
 					So(msg.status, ShouldEqual, PreSyncing)
@@ -78,7 +78,7 @@ func TestMirrorJob(t *testing.T) {
 					loggedContent, err := ioutil.ReadFile(provider.LogFile())
 					So(err, ShouldBeNil)
 					So(string(loggedContent), ShouldEqual, exceptedOutput)
-					ctrlChan <- jobStart
+					job.ctrlChan <- jobStart
 				}
 				select {
 				case msg := <-managerChan:
@@ -92,7 +92,7 @@ func TestMirrorJob(t *testing.T) {
 					So(0, ShouldEqual, 1)
 				}
 
-				ctrlChan <- jobDisable
+				job.ctrlChan <- jobDisable
 				select {
 				case <-managerChan:
 					So(0, ShouldEqual, 1) // made this fail
@@ -112,12 +112,12 @@ echo $TUNASYNC_WORKING_DIR
 			err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
 			So(err, ShouldBeNil)
 
-			ctrlChan := make(chan ctrlAction)
 			managerChan := make(chan jobMessage, 10)
 			semaphore := make(chan empty, 1)
+			job := newMirrorJob(provider)
 
 			Convey("If we kill it", func(ctx C) {
-				go runMirrorJob(provider, ctrlChan, managerChan, semaphore)
+				go job.Run(managerChan, semaphore)
 
 				time.Sleep(1 * time.Second)
 				msg := <-managerChan
@@ -125,7 +125,7 @@ echo $TUNASYNC_WORKING_DIR
 				msg = <-managerChan
 				So(msg.status, ShouldEqual, Syncing)
 
-				ctrlChan <- jobStop
+				job.ctrlChan <- jobStop
 
 				msg = <-managerChan
 				So(msg.status, ShouldEqual, Failed)
@@ -134,11 +134,12 @@ echo $TUNASYNC_WORKING_DIR
 				loggedContent, err := ioutil.ReadFile(provider.LogFile())
 				So(err, ShouldBeNil)
 				So(string(loggedContent), ShouldEqual, exceptedOutput)
-				ctrlChan <- jobDisable
+				job.ctrlChan <- jobDisable
 			})
 
 			Convey("If we don't kill it", func(ctx C) {
-				go runMirrorJob(provider, ctrlChan, managerChan, semaphore)
+				go job.Run(managerChan, semaphore)
+
 				msg := <-managerChan
 				So(msg.status, ShouldEqual, PreSyncing)
 				msg = <-managerChan
@@ -154,7 +155,7 @@ echo $TUNASYNC_WORKING_DIR
 				loggedContent, err := ioutil.ReadFile(provider.LogFile())
 				So(err, ShouldBeNil)
 				So(string(loggedContent), ShouldEqual, exceptedOutput)
-				ctrlChan <- jobDisable
+				job.ctrlChan <- jobDisable
 			})
 		})
 

+ 17 - 0
worker/main.go

@@ -0,0 +1,17 @@
+package worker
+
+import "time"
+
+// toplevel module for workers
+
+func main() {
+
+	for {
+		// if time.Now().After() {
+		//
+		// }
+
+		time.Sleep(1 * time.Second)
+	}
+
+}

+ 71 - 0
worker/schedule.go

@@ -0,0 +1,71 @@
+package worker
+
+// schedule queue for jobs
+
+import (
+	"sync"
+	"time"
+
+	"github.com/ryszard/goskiplist/skiplist"
+)
+
+type scheduleQueue struct {
+	sync.Mutex
+	list *skiplist.SkipList
+}
+
+func timeLessThan(l, r interface{}) bool {
+	tl := l.(time.Time)
+	tr := r.(time.Time)
+	return tl.Before(tr)
+}
+
+func newScheduleQueue() *scheduleQueue {
+	queue := new(scheduleQueue)
+	queue.list = skiplist.NewCustomMap(timeLessThan)
+	return queue
+}
+
+func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) {
+	q.Lock()
+	defer q.Unlock()
+	q.list.Set(schedTime, job)
+}
+
+// pop out the first job if it's time to run it
+func (q *scheduleQueue) Pop() *mirrorJob {
+	q.Lock()
+	defer q.Unlock()
+
+	first := q.list.SeekToFirst()
+	if first == nil {
+		return nil
+	}
+	defer first.Close()
+
+	t := first.Key().(time.Time)
+	if t.Before(time.Now()) {
+		job := first.Value().(*mirrorJob)
+		q.list.Delete(first.Key())
+		return job
+	}
+	return nil
+}
+
+// remove job
+func (q *scheduleQueue) Remove(name string) bool {
+	q.Lock()
+	defer q.Unlock()
+
+	cur := q.list.Iterator()
+	defer cur.Close()
+
+	for cur.Next() {
+		cj := cur.Value().(*mirrorJob)
+		if cj.Name() == name {
+			q.list.Delete(cur.Key())
+			return true
+		}
+	}
+	return false
+}

+ 50 - 0
worker/schedule_test.go

@@ -0,0 +1,50 @@
+package worker
+
+import (
+	"testing"
+	"time"
+
+	. "github.com/smartystreets/goconvey/convey"
+)
+
+func TestSchedule(t *testing.T) {
+
+	Convey("MirrorJobSchedule should work", t, func(ctx C) {
+		schedule := newScheduleQueue()
+
+		Convey("When poping on empty schedule", func() {
+			job := schedule.Pop()
+			So(job, ShouldBeNil)
+		})
+
+		Convey("When adding some jobs", func() {
+			c := cmdConfig{
+				name: "schedule_test",
+			}
+			provider, _ := newCmdProvider(c)
+			job := newMirrorJob(provider)
+			sched := time.Now().Add(1 * time.Second)
+
+			schedule.AddJob(sched, job)
+			So(schedule.Pop(), ShouldBeNil)
+			time.Sleep(1200 * time.Millisecond)
+			So(schedule.Pop(), ShouldEqual, job)
+
+		})
+		Convey("When removing jobs", func() {
+			c := cmdConfig{
+				name: "schedule_test",
+			}
+			provider, _ := newCmdProvider(c)
+			job := newMirrorJob(provider)
+			sched := time.Now().Add(1 * time.Second)
+
+			schedule.AddJob(sched, job)
+			So(schedule.Remove("something"), ShouldBeFalse)
+			So(schedule.Remove("schedule_test"), ShouldBeTrue)
+			time.Sleep(1200 * time.Millisecond)
+			So(schedule.Pop(), ShouldBeNil)
+		})
+
+	})
+}