job.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. tunasync "github.com/tuna/tunasync/internal"
  6. )
  7. // this file contains the workflow of a mirror jb
  8. type ctrlAction uint8
  9. const (
  10. jobStart ctrlAction = iota
  11. jobStop // stop syncing keep the job
  12. jobDisable // disable the job (stops goroutine)
  13. jobRestart // restart syncing
  14. jobPing // ensure the goroutine is alive
  15. )
  16. type jobMessage struct {
  17. status tunasync.SyncStatus
  18. name string
  19. msg string
  20. schedule bool
  21. }
  22. type mirrorJob struct {
  23. provider mirrorProvider
  24. ctrlChan chan ctrlAction
  25. disabled chan empty
  26. started bool
  27. schedule bool
  28. isDisabled bool
  29. }
  30. func newMirrorJob(provider mirrorProvider) *mirrorJob {
  31. return &mirrorJob{
  32. provider: provider,
  33. ctrlChan: make(chan ctrlAction, 1),
  34. started: false,
  35. schedule: false,
  36. isDisabled: false,
  37. }
  38. }
  39. func (m *mirrorJob) Name() string {
  40. return m.provider.Name()
  41. }
  42. // runMirrorJob is the goroutine where syncing job runs in
  43. // arguments:
  44. // provider: mirror provider object
  45. // ctrlChan: receives messages from the manager
  46. // managerChan: push messages to the manager, this channel should have a larger buffer
  47. // sempaphore: make sure the concurrent running syncing job won't explode
  48. // TODO: message struct for managerChan
  49. func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
  50. m.disabled = make(chan empty)
  51. defer func() {
  52. close(m.disabled)
  53. m.schedule = false
  54. m.isDisabled = true
  55. }()
  56. provider := m.provider
  57. // to make code shorter
  58. runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
  59. for _, hook := range Hooks {
  60. if err := action(hook); err != nil {
  61. logger.Error(
  62. "failed at %s hooks for %s: %s",
  63. hookname, m.Name(), err.Error(),
  64. )
  65. managerChan <- jobMessage{
  66. tunasync.Failed, m.Name(),
  67. fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
  68. false,
  69. }
  70. return err
  71. }
  72. }
  73. return nil
  74. }
  75. runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
  76. defer close(jobDone)
  77. managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), "", false}
  78. logger.Info("start syncing: %s", m.Name())
  79. Hooks := provider.Hooks()
  80. rHooks := []jobHook{}
  81. for i := len(Hooks); i > 0; i-- {
  82. rHooks = append(rHooks, Hooks[i-1])
  83. }
  84. logger.Debug("hooks: pre-job")
  85. err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
  86. if err != nil {
  87. return err
  88. }
  89. for retry := 0; retry < maxRetry; retry++ {
  90. stopASAP := false // stop job as soon as possible
  91. if retry > 0 {
  92. logger.Info("retry syncing: %s, retry: %d", m.Name(), retry)
  93. }
  94. err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
  95. if err != nil {
  96. return err
  97. }
  98. // start syncing
  99. managerChan <- jobMessage{tunasync.Syncing, m.Name(), "", false}
  100. var syncErr error
  101. syncDone := make(chan error, 1)
  102. go func() {
  103. err := provider.Run()
  104. if !stopASAP {
  105. syncDone <- err
  106. }
  107. }()
  108. select {
  109. case syncErr = <-syncDone:
  110. logger.Debug("syncing done")
  111. case <-kill:
  112. logger.Debug("received kill")
  113. stopASAP = true
  114. err := provider.Terminate()
  115. if err != nil {
  116. logger.Error("failed to terminate provider %s: %s", m.Name(), err.Error())
  117. return err
  118. }
  119. syncErr = errors.New("killed by manager")
  120. }
  121. // post-exec hooks
  122. herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
  123. if herr != nil {
  124. return herr
  125. }
  126. if syncErr == nil {
  127. // syncing success
  128. logger.Info("succeeded syncing %s", m.Name())
  129. managerChan <- jobMessage{tunasync.Success, m.Name(), "", true}
  130. // post-success hooks
  131. err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
  132. if err != nil {
  133. return err
  134. }
  135. return nil
  136. }
  137. // syncing failed
  138. logger.Warning("failed syncing %s: %s", m.Name(), syncErr.Error())
  139. managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), retry == maxRetry-1}
  140. // post-fail hooks
  141. logger.Debug("post-fail hooks")
  142. err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
  143. if err != nil {
  144. return err
  145. }
  146. // gracefully exit
  147. if stopASAP {
  148. logger.Debug("No retry, exit directly")
  149. return nil
  150. }
  151. // continue to next retry
  152. } // for retry
  153. return nil
  154. }
  155. runJob := func(kill <-chan empty, jobDone chan<- empty) {
  156. select {
  157. case semaphore <- empty{}:
  158. defer func() { <-semaphore }()
  159. runJobWrapper(kill, jobDone)
  160. case <-kill:
  161. jobDone <- empty{}
  162. return
  163. }
  164. }
  165. for {
  166. if m.started {
  167. kill := make(chan empty)
  168. jobDone := make(chan empty)
  169. go runJob(kill, jobDone)
  170. _wait_for_job:
  171. select {
  172. case <-jobDone:
  173. logger.Debug("job done")
  174. case ctrl := <-m.ctrlChan:
  175. switch ctrl {
  176. case jobStop:
  177. m.schedule = false
  178. m.started = false
  179. close(kill)
  180. <-jobDone
  181. case jobDisable:
  182. m.schedule = false
  183. m.isDisabled = true
  184. m.started = false
  185. close(kill)
  186. <-jobDone
  187. return nil
  188. case jobRestart:
  189. m.started = true
  190. close(kill)
  191. <-jobDone
  192. continue
  193. case jobStart:
  194. m.started = true
  195. goto _wait_for_job
  196. default:
  197. // TODO: implement this
  198. close(kill)
  199. return nil
  200. }
  201. }
  202. }
  203. ctrl := <-m.ctrlChan
  204. switch ctrl {
  205. case jobStop:
  206. m.schedule = false
  207. m.started = false
  208. case jobDisable:
  209. m.schedule = false
  210. m.isDisabled = true
  211. m.started = false
  212. return nil
  213. case jobRestart:
  214. m.schedule = true
  215. m.isDisabled = false
  216. m.started = true
  217. case jobStart:
  218. m.schedule = true
  219. m.isDisabled = false
  220. m.started = true
  221. default:
  222. // TODO
  223. return nil
  224. }
  225. }
  226. }