job.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync/atomic"
  6. tunasync "github.com/tuna/tunasync/internal"
  7. )
  8. // this file contains the workflow of a mirror jb
  9. type ctrlAction uint8
  10. const (
  11. jobStart ctrlAction = iota
  12. jobStop // stop syncing keep the job
  13. jobDisable // disable the job (stops goroutine)
  14. jobRestart // restart syncing
  15. jobPing // ensure the goroutine is alive
  16. )
  17. type jobMessage struct {
  18. status tunasync.SyncStatus
  19. name string
  20. msg string
  21. schedule bool
  22. }
  23. const (
  24. // empty state
  25. stateNone uint32 = iota
  26. // ready to run, able to schedule
  27. stateReady
  28. // paused by jobStop
  29. statePaused
  30. // disabled by jobDisable
  31. stateDisabled
  32. )
  33. type mirrorJob struct {
  34. provider mirrorProvider
  35. ctrlChan chan ctrlAction
  36. disabled chan empty
  37. state uint32
  38. }
  39. func newMirrorJob(provider mirrorProvider) *mirrorJob {
  40. return &mirrorJob{
  41. provider: provider,
  42. ctrlChan: make(chan ctrlAction, 1),
  43. state: stateNone,
  44. }
  45. }
  46. func (m *mirrorJob) Name() string {
  47. return m.provider.Name()
  48. }
  49. func (m *mirrorJob) State() uint32 {
  50. return atomic.LoadUint32(&(m.state))
  51. }
  52. func (m *mirrorJob) SetState(state uint32) {
  53. atomic.StoreUint32(&(m.state), state)
  54. }
  55. // runMirrorJob is the goroutine where syncing job runs in
  56. // arguments:
  57. // provider: mirror provider object
  58. // ctrlChan: receives messages from the manager
  59. // managerChan: push messages to the manager, this channel should have a larger buffer
  60. // sempaphore: make sure the concurrent running syncing job won't explode
  61. // TODO: message struct for managerChan
  62. func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
  63. m.disabled = make(chan empty)
  64. defer func() {
  65. close(m.disabled)
  66. m.SetState(stateDisabled)
  67. }()
  68. provider := m.provider
  69. // to make code shorter
  70. runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
  71. for _, hook := range Hooks {
  72. if err := action(hook); err != nil {
  73. logger.Errorf(
  74. "failed at %s hooks for %s: %s",
  75. hookname, m.Name(), err.Error(),
  76. )
  77. managerChan <- jobMessage{
  78. tunasync.Failed, m.Name(),
  79. fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
  80. false,
  81. }
  82. return err
  83. }
  84. }
  85. return nil
  86. }
  87. runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
  88. defer close(jobDone)
  89. managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), "", false}
  90. logger.Noticef("start syncing: %s", m.Name())
  91. Hooks := provider.Hooks()
  92. rHooks := []jobHook{}
  93. for i := len(Hooks); i > 0; i-- {
  94. rHooks = append(rHooks, Hooks[i-1])
  95. }
  96. logger.Debug("hooks: pre-job")
  97. err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
  98. if err != nil {
  99. return err
  100. }
  101. for retry := 0; retry < maxRetry; retry++ {
  102. stopASAP := false // stop job as soon as possible
  103. if retry > 0 {
  104. logger.Noticef("retry syncing: %s, retry: %d", m.Name(), retry)
  105. }
  106. err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
  107. if err != nil {
  108. return err
  109. }
  110. // start syncing
  111. managerChan <- jobMessage{tunasync.Syncing, m.Name(), "", false}
  112. var syncErr error
  113. syncDone := make(chan error, 1)
  114. go func() {
  115. err := provider.Run()
  116. if !stopASAP {
  117. syncDone <- err
  118. }
  119. }()
  120. select {
  121. case syncErr = <-syncDone:
  122. logger.Debug("syncing done")
  123. case <-kill:
  124. logger.Debug("received kill")
  125. stopASAP = true
  126. err := provider.Terminate()
  127. if err != nil {
  128. logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error())
  129. return err
  130. }
  131. syncErr = errors.New("killed by manager")
  132. }
  133. // post-exec hooks
  134. herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
  135. if herr != nil {
  136. return herr
  137. }
  138. if syncErr == nil {
  139. // syncing success
  140. logger.Noticef("succeeded syncing %s", m.Name())
  141. managerChan <- jobMessage{tunasync.Success, m.Name(), "", true}
  142. // post-success hooks
  143. err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
  144. if err != nil {
  145. return err
  146. }
  147. return nil
  148. }
  149. // syncing failed
  150. logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
  151. managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), retry == maxRetry-1}
  152. // post-fail hooks
  153. logger.Debug("post-fail hooks")
  154. err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
  155. if err != nil {
  156. return err
  157. }
  158. // gracefully exit
  159. if stopASAP {
  160. logger.Debug("No retry, exit directly")
  161. return nil
  162. }
  163. // continue to next retry
  164. } // for retry
  165. return nil
  166. }
  167. runJob := func(kill <-chan empty, jobDone chan<- empty) {
  168. select {
  169. case semaphore <- empty{}:
  170. defer func() { <-semaphore }()
  171. runJobWrapper(kill, jobDone)
  172. case <-kill:
  173. jobDone <- empty{}
  174. return
  175. }
  176. }
  177. for {
  178. if m.State() == stateReady {
  179. kill := make(chan empty)
  180. jobDone := make(chan empty)
  181. go runJob(kill, jobDone)
  182. _wait_for_job:
  183. select {
  184. case <-jobDone:
  185. logger.Debug("job done")
  186. case ctrl := <-m.ctrlChan:
  187. switch ctrl {
  188. case jobStop:
  189. m.SetState(statePaused)
  190. close(kill)
  191. <-jobDone
  192. case jobDisable:
  193. m.SetState(stateDisabled)
  194. close(kill)
  195. <-jobDone
  196. return nil
  197. case jobRestart:
  198. m.SetState(stateReady)
  199. close(kill)
  200. <-jobDone
  201. continue
  202. case jobStart:
  203. m.SetState(stateReady)
  204. goto _wait_for_job
  205. default:
  206. // TODO: implement this
  207. close(kill)
  208. return nil
  209. }
  210. }
  211. }
  212. ctrl := <-m.ctrlChan
  213. switch ctrl {
  214. case jobStop:
  215. m.SetState(statePaused)
  216. case jobDisable:
  217. m.SetState(stateDisabled)
  218. return nil
  219. case jobRestart:
  220. m.SetState(stateReady)
  221. case jobStart:
  222. m.SetState(stateReady)
  223. default:
  224. // TODO
  225. return nil
  226. }
  227. }
  228. }