job.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package worker
  2. import "errors"
  3. // this file contains the workflow of a mirror jb
  4. type ctrlAction uint8
  5. const (
  6. jobStart ctrlAction = iota
  7. jobStop // stop syncing keep the job
  8. jobDisable // disable the job (stops goroutine)
  9. jobRestart // restart syncing
  10. jobPing // ensure the goroutine is alive
  11. )
  12. // runMirrorJob is the goroutine where syncing job runs in
  13. // arguments:
  14. // provider: mirror provider object
  15. // ctrlChan: receives messages from the manager
  16. // managerChan: push messages to the manager
  17. // sempaphore: make sure the concurrent running syncing job won't explode
  18. // TODO: message struct for managerChan
  19. func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerChan chan<- struct{}, semaphore chan empty) error {
  20. // to make code shorter
  21. runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
  22. for _, hook := range Hooks {
  23. if err := action(hook); err != nil {
  24. logger.Error(
  25. "failed at %s hooks for %s: %s",
  26. hookname, provider.Name(), err.Error(),
  27. )
  28. return err
  29. }
  30. }
  31. return nil
  32. }
  33. runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
  34. defer close(jobDone)
  35. logger.Info("start syncing: %s", provider.Name())
  36. Hooks := provider.Hooks()
  37. rHooks := []jobHook{}
  38. for i := len(Hooks); i > 0; i-- {
  39. rHooks = append(rHooks, Hooks[i-1])
  40. }
  41. logger.Debug("hooks: pre-job")
  42. err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
  43. if err != nil {
  44. return err
  45. }
  46. for retry := 0; retry < maxRetry; retry++ {
  47. stopASAP := false // stop job as soon as possible
  48. if retry > 0 {
  49. logger.Info("retry syncing: %s, retry: %d", provider.Name(), retry)
  50. }
  51. err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
  52. if err != nil {
  53. return err
  54. }
  55. // start syncing
  56. err = provider.Start()
  57. if err != nil {
  58. logger.Error(
  59. "failed to start syncing job for %s: %s",
  60. provider.Name(), err.Error(),
  61. )
  62. return err
  63. }
  64. var syncErr error
  65. syncDone := make(chan error, 1)
  66. go func() {
  67. err := provider.Wait()
  68. if !stopASAP {
  69. syncDone <- err
  70. }
  71. }()
  72. select {
  73. case syncErr = <-syncDone:
  74. logger.Debug("syncing done")
  75. case <-kill:
  76. logger.Debug("received kill")
  77. stopASAP = true
  78. err := provider.Terminate()
  79. if err != nil {
  80. logger.Error("failed to terminate provider %s: %s", provider.Name(), err.Error())
  81. return err
  82. }
  83. syncErr = errors.New("killed by manager")
  84. }
  85. // post-exec hooks
  86. herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
  87. if herr != nil {
  88. return herr
  89. }
  90. if syncErr == nil {
  91. // syncing success
  92. logger.Info("succeeded syncing %s", provider.Name())
  93. managerChan <- struct{}{}
  94. // post-success hooks
  95. err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
  96. if err != nil {
  97. return err
  98. }
  99. return nil
  100. }
  101. // syncing failed
  102. logger.Warning("failed syncing %s: %s", provider.Name(), syncErr.Error())
  103. managerChan <- struct{}{}
  104. // post-fail hooks
  105. logger.Debug("post-fail hooks")
  106. err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
  107. if err != nil {
  108. return err
  109. }
  110. // gracefully exit
  111. if stopASAP {
  112. logger.Debug("No retry, exit directly")
  113. return nil
  114. }
  115. // continue to next retry
  116. } // for retry
  117. return nil
  118. }
  119. runJob := func(kill <-chan empty, jobDone chan<- empty) {
  120. select {
  121. case <-semaphore:
  122. defer func() { semaphore <- empty{} }()
  123. runJobWrapper(kill, jobDone)
  124. case <-kill:
  125. jobDone <- empty{}
  126. return
  127. }
  128. }
  129. enabled := true // whether this job is stopped by the manager
  130. for {
  131. if enabled {
  132. kill := make(chan empty)
  133. jobDone := make(chan empty)
  134. go runJob(kill, jobDone)
  135. _wait_for_job:
  136. select {
  137. case <-jobDone:
  138. logger.Debug("job done")
  139. case ctrl := <-ctrlChan:
  140. switch ctrl {
  141. case jobStop:
  142. enabled = false
  143. close(kill)
  144. <-jobDone
  145. case jobDisable:
  146. close(kill)
  147. <-jobDone
  148. return nil
  149. case jobRestart:
  150. enabled = true
  151. close(kill)
  152. <-jobDone
  153. continue
  154. case jobStart:
  155. enabled = true
  156. goto _wait_for_job
  157. default:
  158. // TODO: implement this
  159. close(kill)
  160. return nil
  161. }
  162. }
  163. }
  164. ctrl := <-ctrlChan
  165. switch ctrl {
  166. case jobStop:
  167. enabled = false
  168. case jobDisable:
  169. return nil
  170. case jobRestart:
  171. enabled = true
  172. case jobStart:
  173. enabled = true
  174. default:
  175. // TODO
  176. return nil
  177. }
  178. }
  179. }