Browse Source

new command: jobForceStart

Yuxiang Zhang 7 years ago
parent
commit
6cbe91b4f1
1 changed files with 25 additions and 9 deletions
  1. 25 9
      worker/job.go

+ 25 - 9
worker/job.go

@@ -15,12 +15,13 @@ import (
 type ctrlAction uint8
 
 const (
-	jobStart   ctrlAction = iota
-	jobStop               // stop syncing keep the job
-	jobDisable            // disable the job (stops goroutine)
-	jobRestart            // restart syncing
-	jobPing               // ensure the goroutine is alive
-	jobHalt               // worker halts
+	jobStart      ctrlAction = iota
+	jobStop                  // stop syncing keep the job
+	jobDisable               // disable the job (stops goroutine)
+	jobRestart               // restart syncing
+	jobPing                  // ensure the goroutine is alive
+	jobHalt                  // worker halts
+	jobForceStart            // ignore concurrent limit
 )
 
 type jobMessage struct {
@@ -211,22 +212,25 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 		return nil
 	}
 
-	runJob := func(kill <-chan empty, jobDone chan<- empty) {
+	runJob := func(kill <-chan empty, jobDone chan<- empty, bypassSemaphore <-chan empty) {
 		select {
 		case semaphore <- empty{}:
 			defer func() { <-semaphore }()
 			runJobWrapper(kill, jobDone)
+		case <-bypassSemaphore:
+			runJobWrapper(kill, jobDone)
 		case <-kill:
 			jobDone <- empty{}
 			return
 		}
 	}
 
+	bypassSemaphore := make(chan empty, 1)
 	for {
 		if m.State() == stateReady {
 			kill := make(chan empty)
 			jobDone := make(chan empty)
-			go runJob(kill, jobDone)
+			go runJob(kill, jobDone, bypassSemaphore)
 
 		_wait_for_job:
 			select {
@@ -249,6 +253,12 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 					<-jobDone
 					time.Sleep(time.Second) // Restart may fail if the process was not exited yet
 					continue
+				case jobForceStart:
+					select { //non-blocking
+					default:
+					case bypassSemaphore <- empty{}:
+					}
+					fallthrough
 				case jobStart:
 					m.SetState(stateReady)
 					goto _wait_for_job
@@ -272,8 +282,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 		case jobDisable:
 			m.SetState(stateDisabled)
 			return nil
+		case jobForceStart:
+			select { //non-blocking
+			default:
+			case bypassSemaphore <- empty{}:
+			}
+			fallthrough
 		case jobRestart:
-			m.SetState(stateReady)
+			fallthrough
 		case jobStart:
 			m.SetState(stateReady)
 		default: