ソースを参照

fix(worker): fixed scheduling bugs

bigeagle 9 年 前
コミット
65984053eb
3 ファイル変更40 行追加7 行削除
  1. 14 1
      worker/schedule.go
  2. 18 0
      worker/schedule_test.go
  3. 8 6
      worker/worker.go

+ 14 - 1
worker/schedule.go

@@ -12,6 +12,7 @@ import (
 type scheduleQueue struct {
 	sync.Mutex
 	list *skiplist.SkipList
+	jobs map[string]bool
 }
 
 func timeLessThan(l, r interface{}) bool {
@@ -23,12 +24,18 @@ func timeLessThan(l, r interface{}) bool {
 func newScheduleQueue() *scheduleQueue {
 	queue := new(scheduleQueue)
 	queue.list = skiplist.NewCustomMap(timeLessThan)
+	queue.jobs = make(map[string]bool)
 	return queue
 }
 
 func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) {
 	q.Lock()
 	defer q.Unlock()
+	if _, ok := q.jobs[job.Name()]; ok {
+		logger.Warningf("Job %s already scheduled, removing the existing one", job.Name())
+		q.unsafeRemove(job.Name())
+	}
+	q.jobs[job.Name()] = true
 	q.list.Set(schedTime, job)
 	logger.Debugf("Added job %s @ %v", job.Name(), schedTime)
 }
@@ -45,10 +52,11 @@ func (q *scheduleQueue) Pop() *mirrorJob {
 	defer first.Close()
 
 	t := first.Key().(time.Time)
-	// logger.Debug("First job should run @%v", t)
 	if t.Before(time.Now()) {
 		job := first.Value().(*mirrorJob)
 		q.list.Delete(first.Key())
+		delete(q.jobs, job.Name())
+		logger.Debug("Popped out job %s @%v", job.Name(), t)
 		return job
 	}
 	return nil
@@ -58,7 +66,11 @@ func (q *scheduleQueue) Pop() *mirrorJob {
 func (q *scheduleQueue) Remove(name string) bool {
 	q.Lock()
 	defer q.Unlock()
+	return q.unsafeRemove(name)
+}
 
+// remove job
+func (q *scheduleQueue) unsafeRemove(name string) bool {
 	cur := q.list.Iterator()
 	defer cur.Close()
 
@@ -66,6 +78,7 @@ func (q *scheduleQueue) Remove(name string) bool {
 		cj := cur.Value().(*mirrorJob)
 		if cj.Name() == name {
 			q.list.Delete(cur.Key())
+			delete(q.jobs, name)
 			return true
 		}
 	}

+ 18 - 0
worker/schedule_test.go

@@ -30,6 +30,24 @@ func TestSchedule(t *testing.T) {
 			time.Sleep(1200 * time.Millisecond)
 			So(schedule.Pop(), ShouldEqual, job)
 
+		})
+		Convey("When adding one job twice", func() {
+			c := cmdConfig{
+				name: "schedule_test",
+			}
+			provider, _ := newCmdProvider(c)
+			job := newMirrorJob(provider)
+			sched := time.Now().Add(1 * time.Second)
+
+			schedule.AddJob(sched, job)
+			schedule.AddJob(sched.Add(1*time.Second), job)
+
+			So(schedule.Pop(), ShouldBeNil)
+			time.Sleep(1200 * time.Millisecond)
+			So(schedule.Pop(), ShouldBeNil)
+			time.Sleep(1200 * time.Millisecond)
+			So(schedule.Pop(), ShouldEqual, job)
+
 		})
 		Convey("When removing jobs", func() {
 			c := cmdConfig{

+ 8 - 6
worker/worker.go

@@ -109,7 +109,7 @@ func (w *Worker) ReloadMirrorConfig(newMirrors []mirrorConfig) {
 				job.SetState(statePaused)
 				go job.Run(w.managerChan, w.semaphore)
 			} else {
-				job.SetState(stateReady)
+				job.SetState(stateNone)
 				go job.Run(w.managerChan, w.semaphore)
 				w.schedule.AddJob(time.Now(), job)
 			}
@@ -125,7 +125,7 @@ func (w *Worker) ReloadMirrorConfig(newMirrors []mirrorConfig) {
 		job := newMirrorJob(provider)
 		w.jobs[provider.Name()] = job
 
-		job.SetState(stateReady)
+		job.SetState(stateNone)
 		go job.Run(w.managerChan, w.semaphore)
 		w.schedule.AddJob(time.Now(), job)
 		logger.Noticef("New job %s", job.Name())
@@ -166,6 +166,9 @@ func (w *Worker) makeHTTPServer() {
 		}
 
 		logger.Noticef("Received command: %v", cmd)
+		// No matter what command, the existing job
+		// schedule should be flushed
+		w.schedule.Remove(job.Name())
 		// if job disabled, start them first
 		switch cmd.Cmd {
 		case CmdStart, CmdRestart:
@@ -181,14 +184,13 @@ func (w *Worker) makeHTTPServer() {
 		case CmdStop:
 			// if job is disabled, no goroutine would be there
 			// receiving this signal
-			w.schedule.Remove(job.Name())
 			if job.State() != stateDisabled {
 				job.ctrlChan <- jobStop
 			}
 		case CmdDisable:
 			w.disableJob(job)
 		case CmdPing:
-			job.ctrlChan <- jobStart
+			// empty
 		default:
 			c.JSON(http.StatusNotAcceptable, gin.H{"msg": "Invalid Command"})
 			return
@@ -250,7 +252,7 @@ func (w *Worker) runSchedule() {
 				go job.Run(w.managerChan, w.semaphore)
 				continue
 			default:
-				job.SetState(stateReady)
+				job.SetState(stateNone)
 				go job.Run(w.managerChan, w.semaphore)
 				stime := m.LastUpdate.Add(job.provider.Interval())
 				logger.Debugf("Scheduling job %s @%s", job.Name(), stime.Format("2006-01-02 15:04:05"))
@@ -263,7 +265,7 @@ func (w *Worker) runSchedule() {
 	// manager's mirror list
 	for name := range unset {
 		job := w.jobs[name]
-		job.SetState(stateReady)
+		job.SetState(stateNone)
 		go job.Run(w.managerChan, w.semaphore)
 		w.schedule.AddJob(time.Now(), job)
 	}