job.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync/atomic"
  6. tunasync "github.com/tuna/tunasync/internal"
  7. )
  8. // this file contains the workflow of a mirror jb
  9. type ctrlAction uint8
  10. const (
  11. jobStart ctrlAction = iota
  12. jobStop // stop syncing keep the job
  13. jobDisable // disable the job (stops goroutine)
  14. jobRestart // restart syncing
  15. jobPing // ensure the goroutine is alive
  16. )
  17. type jobMessage struct {
  18. status tunasync.SyncStatus
  19. name string
  20. msg string
  21. schedule bool
  22. }
  23. const (
  24. // empty state
  25. stateNone uint32 = iota
  26. // ready to run, able to schedule
  27. stateReady
  28. // paused by jobStop
  29. statePaused
  30. // disabled by jobDisable
  31. stateDisabled
  32. )
  33. type mirrorJob struct {
  34. provider mirrorProvider
  35. ctrlChan chan ctrlAction
  36. disabled chan empty
  37. state uint32
  38. }
  39. func newMirrorJob(provider mirrorProvider) *mirrorJob {
  40. return &mirrorJob{
  41. provider: provider,
  42. ctrlChan: make(chan ctrlAction, 1),
  43. state: stateNone,
  44. }
  45. }
  46. func (m *mirrorJob) Name() string {
  47. return m.provider.Name()
  48. }
  49. func (m *mirrorJob) State() uint32 {
  50. return atomic.LoadUint32(&(m.state))
  51. }
  52. func (m *mirrorJob) SetState(state uint32) {
  53. atomic.StoreUint32(&(m.state), state)
  54. }
  55. func (m *mirrorJob) SetProvider(provider mirrorProvider) error {
  56. s := m.State()
  57. if (s != stateNone) && (s != stateDisabled) {
  58. return fmt.Errorf("Provider cannot be switched when job state is %d", s)
  59. }
  60. m.provider = provider
  61. return nil
  62. }
  63. // runMirrorJob is the goroutine where syncing job runs in
  64. // arguments:
  65. // provider: mirror provider object
  66. // ctrlChan: receives messages from the manager
  67. // managerChan: push messages to the manager, this channel should have a larger buffer
  68. // sempaphore: make sure the concurrent running syncing job won't explode
  69. // TODO: message struct for managerChan
  70. func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
  71. m.disabled = make(chan empty)
  72. defer func() {
  73. close(m.disabled)
  74. m.SetState(stateDisabled)
  75. }()
  76. provider := m.provider
  77. // to make code shorter
  78. runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
  79. for _, hook := range Hooks {
  80. if err := action(hook); err != nil {
  81. logger.Errorf(
  82. "failed at %s hooks for %s: %s",
  83. hookname, m.Name(), err.Error(),
  84. )
  85. managerChan <- jobMessage{
  86. tunasync.Failed, m.Name(),
  87. fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
  88. false,
  89. }
  90. return err
  91. }
  92. }
  93. return nil
  94. }
  95. runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
  96. defer close(jobDone)
  97. managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), "", false}
  98. logger.Noticef("start syncing: %s", m.Name())
  99. Hooks := provider.Hooks()
  100. rHooks := []jobHook{}
  101. for i := len(Hooks); i > 0; i-- {
  102. rHooks = append(rHooks, Hooks[i-1])
  103. }
  104. logger.Debug("hooks: pre-job")
  105. err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
  106. if err != nil {
  107. return err
  108. }
  109. for retry := 0; retry < maxRetry; retry++ {
  110. stopASAP := false // stop job as soon as possible
  111. if retry > 0 {
  112. logger.Noticef("retry syncing: %s, retry: %d", m.Name(), retry)
  113. }
  114. err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
  115. if err != nil {
  116. return err
  117. }
  118. // start syncing
  119. managerChan <- jobMessage{tunasync.Syncing, m.Name(), "", false}
  120. var syncErr error
  121. syncDone := make(chan error, 1)
  122. go func() {
  123. err := provider.Run()
  124. if !stopASAP {
  125. syncDone <- err
  126. }
  127. }()
  128. select {
  129. case syncErr = <-syncDone:
  130. logger.Debug("syncing done")
  131. case <-kill:
  132. logger.Debug("received kill")
  133. stopASAP = true
  134. err := provider.Terminate()
  135. if err != nil {
  136. logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error())
  137. return err
  138. }
  139. syncErr = errors.New("killed by manager")
  140. }
  141. // post-exec hooks
  142. herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
  143. if herr != nil {
  144. return herr
  145. }
  146. if syncErr == nil {
  147. // syncing success
  148. logger.Noticef("succeeded syncing %s", m.Name())
  149. managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)}
  150. // post-success hooks
  151. err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
  152. if err != nil {
  153. return err
  154. }
  155. return nil
  156. }
  157. // syncing failed
  158. logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
  159. managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == maxRetry-1) && (m.State() == stateReady)}
  160. // post-fail hooks
  161. logger.Debug("post-fail hooks")
  162. err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
  163. if err != nil {
  164. return err
  165. }
  166. // gracefully exit
  167. if stopASAP {
  168. logger.Debug("No retry, exit directly")
  169. return nil
  170. }
  171. // continue to next retry
  172. } // for retry
  173. return nil
  174. }
  175. runJob := func(kill <-chan empty, jobDone chan<- empty) {
  176. select {
  177. case semaphore <- empty{}:
  178. defer func() { <-semaphore }()
  179. runJobWrapper(kill, jobDone)
  180. case <-kill:
  181. jobDone <- empty{}
  182. return
  183. }
  184. }
  185. for {
  186. if m.State() == stateReady {
  187. kill := make(chan empty)
  188. jobDone := make(chan empty)
  189. go runJob(kill, jobDone)
  190. _wait_for_job:
  191. select {
  192. case <-jobDone:
  193. logger.Debug("job done")
  194. case ctrl := <-m.ctrlChan:
  195. switch ctrl {
  196. case jobStop:
  197. m.SetState(statePaused)
  198. close(kill)
  199. <-jobDone
  200. case jobDisable:
  201. m.SetState(stateDisabled)
  202. close(kill)
  203. <-jobDone
  204. return nil
  205. case jobRestart:
  206. m.SetState(stateReady)
  207. close(kill)
  208. <-jobDone
  209. continue
  210. case jobStart:
  211. m.SetState(stateReady)
  212. goto _wait_for_job
  213. default:
  214. // TODO: implement this
  215. close(kill)
  216. return nil
  217. }
  218. }
  219. }
  220. ctrl := <-m.ctrlChan
  221. switch ctrl {
  222. case jobStop:
  223. m.SetState(statePaused)
  224. case jobDisable:
  225. m.SetState(stateDisabled)
  226. return nil
  227. case jobRestart:
  228. m.SetState(stateReady)
  229. case jobStart:
  230. m.SetState(stateReady)
  231. default:
  232. // TODO
  233. return nil
  234. }
  235. }
  236. }