job.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package worker
  2. import (
  3. "errors"
  4. "time"
  5. )
  6. // this file contains the workflow of a mirror jb
  7. type ctrlAction uint8
  8. const (
  9. jobStart ctrlAction = iota
  10. jobStop // stop syncing keep the job
  11. jobDisable // disable the job (stops goroutine)
  12. jobRestart // restart syncing
  13. jobPing // ensure the goroutine is alive
  14. )
  15. // runMirrorJob is the goroutine where syncing job runs in
  16. // arguments:
  17. // provider: mirror provider object
  18. // ctrlChan: receives messages from the manager
  19. // managerChan: push messages to the manager
  20. // sempaphore: make sure the concurrent running syncing job won't explode
  21. // TODO: message struct for managerChan
  22. func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerChan chan<- struct{}, semaphore chan empty) error {
  23. // to make code shorter
  24. runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
  25. for _, hook := range Hooks {
  26. if err := action(hook); err != nil {
  27. logger.Error(
  28. "failed at %s hooks for %s: %s",
  29. hookname, provider.Name(), err.Error(),
  30. )
  31. return err
  32. }
  33. }
  34. return nil
  35. }
  36. runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
  37. defer func() { jobDone <- empty{} }()
  38. logger.Info("start syncing: %s", provider.Name())
  39. Hooks := provider.Hooks()
  40. rHooks := []jobHook{}
  41. for i := len(Hooks); i > 0; i-- {
  42. rHooks = append(rHooks, Hooks[i-1])
  43. }
  44. logger.Debug("hooks: pre-job")
  45. err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
  46. if err != nil {
  47. return err
  48. }
  49. for retry := 0; retry < maxRetry; retry++ {
  50. stopASAP := false // stop job as soon as possible
  51. if retry > 0 {
  52. logger.Info("retry syncing: %s, retry: %d", provider.Name(), retry)
  53. }
  54. err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
  55. if err != nil {
  56. return err
  57. }
  58. // start syncing
  59. err = provider.Start()
  60. if err != nil {
  61. logger.Error(
  62. "failed to start syncing job for %s: %s",
  63. provider.Name(), err.Error(),
  64. )
  65. return err
  66. }
  67. var syncErr error
  68. syncDone := make(chan error, 1)
  69. go func() {
  70. err := provider.Wait()
  71. if !stopASAP {
  72. syncDone <- err
  73. }
  74. }()
  75. select {
  76. case syncErr = <-syncDone:
  77. logger.Debug("syncing done")
  78. case <-kill:
  79. stopASAP = true
  80. err := provider.Terminate()
  81. if err != nil {
  82. logger.Error("failed to terminate provider %s: %s", provider.Name(), err.Error())
  83. return err
  84. }
  85. syncErr = errors.New("killed by manager")
  86. }
  87. // post-exec hooks
  88. herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
  89. if herr != nil {
  90. return herr
  91. }
  92. if syncErr == nil {
  93. // syncing success
  94. logger.Info("succeeded syncing %s", provider.Name())
  95. managerChan <- struct{}{}
  96. // post-success hooks
  97. err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
  98. if err != nil {
  99. return err
  100. }
  101. return nil
  102. }
  103. // syncing failed
  104. logger.Info("failed syncing %s: %s", provider.Name(), err.Error())
  105. managerChan <- struct{}{}
  106. // post-fail hooks
  107. err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
  108. if err != nil {
  109. return err
  110. }
  111. // gracefully exit
  112. if stopASAP {
  113. return nil
  114. }
  115. // continue to next retry
  116. } // for retry
  117. return nil
  118. }
  119. runJob := func(kill <-chan empty, jobDone chan<- empty) {
  120. select {
  121. case <-semaphore:
  122. defer func() { semaphore <- empty{} }()
  123. runJobWrapper(kill, jobDone)
  124. case <-kill:
  125. return
  126. }
  127. }
  128. enabled := true // whether this job is stopped by the manager
  129. for {
  130. if enabled {
  131. kill := make(chan empty)
  132. jobDone := make(chan empty)
  133. go runJob(kill, jobDone)
  134. _wait_for_job:
  135. select {
  136. case <-jobDone:
  137. logger.Debug("job done")
  138. case ctrl := <-ctrlChan:
  139. switch ctrl {
  140. case jobStop:
  141. enabled = false
  142. close(kill)
  143. case jobDisable:
  144. close(kill)
  145. return nil
  146. case jobRestart:
  147. enabled = true
  148. close(kill)
  149. continue
  150. case jobStart:
  151. enabled = true
  152. goto _wait_for_job
  153. default:
  154. // TODO: implement this
  155. close(kill)
  156. return nil
  157. }
  158. }
  159. }
  160. select {
  161. case <-time.After(provider.Interval()):
  162. continue
  163. case ctrl := <-ctrlChan:
  164. switch ctrl {
  165. case jobStop:
  166. enabled = false
  167. case jobDisable:
  168. return nil
  169. case jobRestart:
  170. enabled = true
  171. case jobStart:
  172. enabled = true
  173. default:
  174. // TODO
  175. return nil
  176. }
  177. }
  178. }
  179. return nil
  180. }