Просмотр исходного кода

fix(worker): fixed job status and control logic

bigeagle 9 лет назад
Родитель
Сommit
292a24ba20
6 измененных файлов с 174 добавлено и 70 удалено
  1. 27 1
      internal/msg.go
  2. 34 0
      manager/server.go
  3. 44 36
      worker/job.go
  4. 1 0
      worker/provider.go
  5. 1 0
      worker/schedule.go
  6. 67 33
      worker/worker.go

+ 27 - 1
internal/msg.go

@@ -1,6 +1,9 @@
 package internal
 
-import "time"
+import (
+	"fmt"
+	"time"
+)
 
 // A StatusUpdateMsg represents a msg when
 // a worker has done syncing
@@ -34,6 +37,22 @@ const (
 	CmdPing            // ensure the goroutine is alive
 )
 
+func (c CmdVerb) String() string {
+	switch c {
+	case CmdStart:
+		return "start"
+	case CmdStop:
+		return "stop"
+	case CmdDisable:
+		return "disable"
+	case CmdRestart:
+		return "restart"
+	case CmdPing:
+		return "ping"
+	}
+	return "unknown"
+}
+
 // A WorkerCmd is the command message send from the
 // manager to a worker
 type WorkerCmd struct {
@@ -42,6 +61,13 @@ type WorkerCmd struct {
 	Args     []string `json:"args"`
 }
 
+func (c WorkerCmd) String() string {
+	if len(c.Args) > 0 {
+		return fmt.Sprintf("%v (%s, %v)", c.Cmd, c.MirrorID, c.Args)
+	}
+	return fmt.Sprintf("%v (%s)", c.Cmd, c.MirrorID)
+}
+
 // A ClientCmd is the command message send from client
 // to the manager
 type ClientCmd struct {

+ 34 - 0
manager/server.go

@@ -194,6 +194,24 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
 	var status MirrorStatus
 	c.BindJSON(&status)
 	mirrorName := status.Name
+
+	curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
+	if err != nil {
+		err := fmt.Errorf("failed to get job %s of worker %s: %s",
+			mirrorName, workerID, err.Error(),
+		)
+		c.Error(err)
+		s.returnErrJSON(c, http.StatusInternalServerError, err)
+		return
+	}
+
+	// Only successful syncing needs last_update
+	if status.Status == Success {
+		status.LastUpdate = time.Now()
+	} else {
+		status.LastUpdate = curStatus.LastUpdate
+	}
+
 	newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
 	if err != nil {
 		err := fmt.Errorf("failed to update job %s of worker %s: %s",
@@ -231,6 +249,22 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
 		Args:     clientCmd.Args,
 	}
 
+	// update job status, even if the job did not disable successfully,
+	// this status should be set as disabled
+	curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
+	changed := false
+	switch clientCmd.Cmd {
+	case CmdDisable:
+		curStat.Status = Disabled
+		changed = true
+	case CmdStop:
+		curStat.Status = Paused
+		changed = true
+	}
+	if changed {
+		s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
+	}
+
 	// post command to worker
 	_, err = PostJSON(workerURL, workerCmd, s.tlsConfig)
 	if err != nil {

+ 44 - 36
worker/job.go

@@ -20,23 +20,28 @@ const (
 )
 
 type jobMessage struct {
-	status tunasync.SyncStatus
-	name   string
-	msg    string
+	status   tunasync.SyncStatus
+	name     string
+	msg      string
+	schedule bool
 }
 
 type mirrorJob struct {
-	provider mirrorProvider
-	ctrlChan chan ctrlAction
-	disabled chan empty
-	enabled  bool
+	provider   mirrorProvider
+	ctrlChan   chan ctrlAction
+	disabled   chan empty
+	started    bool
+	schedule   bool
+	isDisabled bool
 }
 
 func newMirrorJob(provider mirrorProvider) *mirrorJob {
 	return &mirrorJob{
-		provider: provider,
-		ctrlChan: make(chan ctrlAction, 1),
-		enabled:  false,
+		provider:   provider,
+		ctrlChan:   make(chan ctrlAction, 1),
+		started:    false,
+		schedule:   false,
+		isDisabled: false,
 	}
 }
 
@@ -44,18 +49,6 @@ func (m *mirrorJob) Name() string {
 	return m.provider.Name()
 }
 
-func (m *mirrorJob) Disabled() bool {
-	if !m.enabled {
-		return true
-	}
-	select {
-	case <-m.disabled:
-		return true
-	default:
-		return false
-	}
-}
-
 // runMirrorJob is the goroutine where syncing job runs in
 // arguments:
 //    provider: mirror provider object
@@ -66,7 +59,11 @@ func (m *mirrorJob) Disabled() bool {
 func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
 
 	m.disabled = make(chan empty)
-	defer close(m.disabled)
+	defer func() {
+		close(m.disabled)
+		m.schedule = false
+		m.isDisabled = true
+	}()
 
 	provider := m.provider
 
@@ -81,6 +78,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 				managerChan <- jobMessage{
 					tunasync.Failed, m.Name(),
 					fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
+					false,
 				}
 				return err
 			}
@@ -91,7 +89,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 	runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
 		defer close(jobDone)
 
-		managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), ""}
+		managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), "", false}
 		logger.Info("start syncing: %s", m.Name())
 
 		Hooks := provider.Hooks()
@@ -118,7 +116,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 			}
 
 			// start syncing
-			managerChan <- jobMessage{tunasync.Syncing, m.Name(), ""}
+			managerChan <- jobMessage{tunasync.Syncing, m.Name(), "", false}
 
 			var syncErr error
 			syncDone := make(chan error, 1)
@@ -152,7 +150,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 			if syncErr == nil {
 				// syncing success
 				logger.Info("succeeded syncing %s", m.Name())
-				managerChan <- jobMessage{tunasync.Success, m.Name(), ""}
+				managerChan <- jobMessage{tunasync.Success, m.Name(), "", true}
 				// post-success hooks
 				err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
 				if err != nil {
@@ -164,7 +162,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 
 			// syncing failed
 			logger.Warning("failed syncing %s: %s", m.Name(), syncErr.Error())
-			managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error()}
+			managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), retry == maxRetry-1}
 
 			// post-fail hooks
 			logger.Debug("post-fail hooks")
@@ -194,7 +192,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 	}
 
 	for {
-		if m.enabled {
+		if m.started {
 			kill := make(chan empty)
 			jobDone := make(chan empty)
 			go runJob(kill, jobDone)
@@ -206,21 +204,24 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 			case ctrl := <-m.ctrlChan:
 				switch ctrl {
 				case jobStop:
-					m.enabled = false
+					m.schedule = false
+					m.started = false
 					close(kill)
 					<-jobDone
 				case jobDisable:
-					m.enabled = false
+					m.schedule = false
+					m.isDisabled = true
+					m.started = false
 					close(kill)
 					<-jobDone
 					return nil
 				case jobRestart:
-					m.enabled = true
+					m.started = true
 					close(kill)
 					<-jobDone
 					continue
 				case jobStart:
-					m.enabled = true
+					m.started = true
 					goto _wait_for_job
 				default:
 					// TODO: implement this
@@ -233,14 +234,21 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 		ctrl := <-m.ctrlChan
 		switch ctrl {
 		case jobStop:
-			m.enabled = false
+			m.schedule = false
+			m.started = false
 		case jobDisable:
-			m.enabled = false
+			m.schedule = false
+			m.isDisabled = true
+			m.started = false
 			return nil
 		case jobRestart:
-			m.enabled = true
+			m.schedule = true
+			m.isDisabled = false
+			m.started = true
 		case jobStart:
-			m.enabled = true
+			m.schedule = true
+			m.isDisabled = false
+			m.started = true
 		default:
 			// TODO
 			return nil

+ 1 - 0
worker/provider.go

@@ -85,6 +85,7 @@ func (p *baseProvider) Context() *Context {
 }
 
 func (p *baseProvider) Interval() time.Duration {
+	// logger.Debug("interval for %s: %v", p.Name(), p.interval)
 	return p.interval
 }
 

+ 1 - 0
worker/schedule.go

@@ -44,6 +44,7 @@ func (q *scheduleQueue) Pop() *mirrorJob {
 	defer first.Close()
 
 	t := first.Key().(time.Time)
+	// logger.Debug("First job should run @%v", t)
 	if t.Before(time.Now()) {
 		job := first.Value().(*mirrorJob)
 		q.list.Delete(first.Key())

+ 67 - 33
worker/worker.go

@@ -28,8 +28,6 @@ type Worker struct {
 	schedule   *scheduleQueue
 	httpServer *gin.Engine
 	tlsConfig  *tls.Config
-
-	mirrorStatus map[string]SyncStatus
 }
 
 // GetTUNASyncWorker returns a singalton worker
@@ -46,8 +44,7 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
 		managerChan: make(chan jobMessage, 32),
 		semaphore:   make(chan empty, cfg.Global.Concurrent),
 
-		schedule:     newScheduleQueue(),
-		mirrorStatus: make(map[string]SyncStatus),
+		schedule: newScheduleQueue(),
 	}
 
 	if cfg.Manager.CACert != "" {
@@ -89,6 +86,9 @@ func (w *Worker) initProviders() {
 				c.Global.MirrorDir, mirror.Name,
 			)
 		}
+		if mirror.Interval == 0 {
+			mirror.Interval = c.Global.Interval
+		}
 		logDir = formatLogDir(logDir, mirror)
 
 		var provider mirrorProvider
@@ -163,8 +163,6 @@ func (w *Worker) initJobs() {
 
 	for name, provider := range w.providers {
 		w.jobs[name] = newMirrorJob(provider)
-		go w.jobs[name].Run(w.managerChan, w.semaphore)
-		w.mirrorStatus[name] = Paused
 	}
 }
 
@@ -185,24 +183,40 @@ func (w *Worker) makeHTTPServer() {
 			c.JSON(http.StatusNotFound, gin.H{"msg": fmt.Sprintf("Mirror ``%s'' not found", cmd.MirrorID)})
 			return
 		}
+		logger.Info("Received command: %v", cmd)
 		// if job disabled, start them first
 		switch cmd.Cmd {
 		case CmdStart, CmdRestart:
-			if job.Disabled() {
+			if job.isDisabled {
 				go job.Run(w.managerChan, w.semaphore)
 			}
 		}
 		switch cmd.Cmd {
 		case CmdStart:
+			job.schedule = true
+			job.isDisabled = false
 			job.ctrlChan <- jobStart
-		case CmdStop:
-			job.ctrlChan <- jobStop
 		case CmdRestart:
+			job.schedule = true
+			job.isDisabled = false
 			job.ctrlChan <- jobRestart
+		case CmdStop:
+			// if job is disabled, no goroutine would be there
+			// receiving this signal
+			if !job.isDisabled {
+				job.schedule = false
+				job.isDisabled = false
+				w.schedule.Remove(job.Name())
+				job.ctrlChan <- jobStop
+			}
 		case CmdDisable:
-			w.schedule.Remove(job.Name())
-			job.ctrlChan <- jobDisable
-			<-job.disabled
+			if !job.isDisabled {
+				job.schedule = false
+				job.isDisabled = true
+				w.schedule.Remove(job.Name())
+				job.ctrlChan <- jobDisable
+				<-job.disabled
+			}
 		case CmdPing:
 			job.ctrlChan <- jobStart
 		default:
@@ -243,15 +257,32 @@ func (w *Worker) runSchedule() {
 	for name := range w.jobs {
 		unset[name] = true
 	}
+	// Fetch mirror list stored in the manager
+	// put it on the scheduled time
+	// if it's disabled, ignore it
 	for _, m := range mirrorList {
 		if job, ok := w.jobs[m.Name]; ok {
-			stime := m.LastUpdate.Add(job.provider.Interval())
-			w.schedule.AddJob(stime, job)
 			delete(unset, m.Name)
+			switch m.Status {
+			case Paused:
+				go job.Run(w.managerChan, w.semaphore)
+				job.schedule = false
+				continue
+			case Disabled:
+				job.schedule = false
+				job.isDisabled = true
+				continue
+			default:
+				go job.Run(w.managerChan, w.semaphore)
+				stime := m.LastUpdate.Add(job.provider.Interval())
+				logger.Debug("Scheduling job %s @%s", job.Name(), stime.Format("2006-01-02 15:04:05"))
+				w.schedule.AddJob(stime, job)
+			}
 		}
 	}
 	for name := range unset {
 		job := w.jobs[name]
+		go job.Run(w.managerChan, w.semaphore)
 		w.schedule.AddJob(time.Now(), job)
 	}
 
@@ -259,22 +290,26 @@ func (w *Worker) runSchedule() {
 		select {
 		case jobMsg := <-w.managerChan:
 			// got status update from job
-			w.updateStatus(jobMsg)
-			status := w.mirrorStatus[jobMsg.name]
-			if status == Disabled || status == Paused {
+			job := w.jobs[jobMsg.name]
+			if !job.schedule {
+				logger.Info("Job %s disabled/paused, skip adding new schedule", jobMsg.name)
 				continue
 			}
-			w.mirrorStatus[jobMsg.name] = jobMsg.status
-			switch jobMsg.status {
-			case Success, Failed:
-				job := w.jobs[jobMsg.name]
-				w.schedule.AddJob(
-					time.Now().Add(job.provider.Interval()),
-					job,
+
+			w.updateStatus(jobMsg)
+
+			if jobMsg.schedule {
+				schedTime := time.Now().Add(job.provider.Interval())
+				logger.Info(
+					"Next scheduled time for %s: %s",
+					job.Name(),
+					schedTime.Format("2006-01-02 15:04:05"),
 				)
+				w.schedule.AddJob(schedTime, job)
 			}
 
-		case <-time.Tick(10 * time.Second):
+		case <-time.Tick(5 * time.Second):
+			// check schedule every 5 seconds
 			if job := w.schedule.Pop(); job != nil {
 				job.ctrlChan <- jobStart
 			}
@@ -324,14 +359,13 @@ func (w *Worker) updateStatus(jobMsg jobMessage) {
 	)
 	p := w.providers[jobMsg.name]
 	smsg := MirrorStatus{
-		Name:       jobMsg.name,
-		Worker:     w.cfg.Global.Name,
-		IsMaster:   true,
-		Status:     jobMsg.status,
-		LastUpdate: time.Now(),
-		Upstream:   p.Upstream(),
-		Size:       "unknown",
-		ErrorMsg:   jobMsg.msg,
+		Name:     jobMsg.name,
+		Worker:   w.cfg.Global.Name,
+		IsMaster: true,
+		Status:   jobMsg.status,
+		Upstream: p.Upstream(),
+		Size:     "unknown",
+		ErrorMsg: jobMsg.msg,
 	}
 
 	if _, err := PostJSON(url, smsg, w.tlsConfig); err != nil {