job.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. tunasync "github.com/tuna/tunasync/internal"
  8. )
  9. // this file contains the workflow of a mirror jb
  10. type ctrlAction uint8
  11. const (
  12. jobStart ctrlAction = iota
  13. jobStop // stop syncing keep the job
  14. jobDisable // disable the job (stops goroutine)
  15. jobRestart // restart syncing
  16. jobPing // ensure the goroutine is alive
  17. jobHalt // worker halts
  18. )
  19. type jobMessage struct {
  20. status tunasync.SyncStatus
  21. name string
  22. msg string
  23. schedule bool
  24. }
  25. const (
  26. // empty state
  27. stateNone uint32 = iota
  28. // ready to run, able to schedule
  29. stateReady
  30. // paused by jobStop
  31. statePaused
  32. // disabled by jobDisable
  33. stateDisabled
  34. // worker is halting
  35. stateHalting
  36. )
  37. // use to ensure all jobs are finished before
  38. // worker exit
  39. var jobsDone sync.WaitGroup
  40. type mirrorJob struct {
  41. provider mirrorProvider
  42. ctrlChan chan ctrlAction
  43. disabled chan empty
  44. state uint32
  45. }
  46. func newMirrorJob(provider mirrorProvider) *mirrorJob {
  47. return &mirrorJob{
  48. provider: provider,
  49. ctrlChan: make(chan ctrlAction, 1),
  50. state: stateNone,
  51. }
  52. }
  53. func (m *mirrorJob) Name() string {
  54. return m.provider.Name()
  55. }
  56. func (m *mirrorJob) State() uint32 {
  57. return atomic.LoadUint32(&(m.state))
  58. }
  59. func (m *mirrorJob) SetState(state uint32) {
  60. atomic.StoreUint32(&(m.state), state)
  61. }
  62. func (m *mirrorJob) SetProvider(provider mirrorProvider) error {
  63. s := m.State()
  64. if (s != stateNone) && (s != stateDisabled) {
  65. return fmt.Errorf("Provider cannot be switched when job state is %d", s)
  66. }
  67. m.provider = provider
  68. return nil
  69. }
  70. // runMirrorJob is the goroutine where syncing job runs in
  71. // arguments:
  72. // provider: mirror provider object
  73. // ctrlChan: receives messages from the manager
  74. // managerChan: push messages to the manager, this channel should have a larger buffer
  75. // sempaphore: make sure the concurrent running syncing job won't explode
  76. // TODO: message struct for managerChan
  77. func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
  78. jobsDone.Add(1)
  79. m.disabled = make(chan empty)
  80. defer func() {
  81. close(m.disabled)
  82. jobsDone.Done()
  83. }()
  84. provider := m.provider
  85. // to make code shorter
  86. runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
  87. for _, hook := range Hooks {
  88. if err := action(hook); err != nil {
  89. logger.Errorf(
  90. "failed at %s hooks for %s: %s",
  91. hookname, m.Name(), err.Error(),
  92. )
  93. managerChan <- jobMessage{
  94. tunasync.Failed, m.Name(),
  95. fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
  96. false,
  97. }
  98. return err
  99. }
  100. }
  101. return nil
  102. }
  103. runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
  104. defer close(jobDone)
  105. managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), "", false}
  106. logger.Noticef("start syncing: %s", m.Name())
  107. Hooks := provider.Hooks()
  108. rHooks := []jobHook{}
  109. for i := len(Hooks); i > 0; i-- {
  110. rHooks = append(rHooks, Hooks[i-1])
  111. }
  112. logger.Debug("hooks: pre-job")
  113. err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
  114. if err != nil {
  115. return err
  116. }
  117. for retry := 0; retry < maxRetry; retry++ {
  118. stopASAP := false // stop job as soon as possible
  119. if retry > 0 {
  120. logger.Noticef("retry syncing: %s, retry: %d", m.Name(), retry)
  121. }
  122. err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
  123. if err != nil {
  124. return err
  125. }
  126. // start syncing
  127. managerChan <- jobMessage{tunasync.Syncing, m.Name(), "", false}
  128. var syncErr error
  129. syncDone := make(chan error, 1)
  130. go func() {
  131. err := provider.Run()
  132. if !stopASAP {
  133. syncDone <- err
  134. }
  135. }()
  136. select {
  137. case syncErr = <-syncDone:
  138. logger.Debug("syncing done")
  139. case <-kill:
  140. logger.Debug("received kill")
  141. stopASAP = true
  142. err := provider.Terminate()
  143. if err != nil {
  144. logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error())
  145. return err
  146. }
  147. syncErr = errors.New("killed by manager")
  148. }
  149. // post-exec hooks
  150. herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
  151. if herr != nil {
  152. return herr
  153. }
  154. if syncErr == nil {
  155. // syncing success
  156. logger.Noticef("succeeded syncing %s", m.Name())
  157. managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)}
  158. // post-success hooks
  159. err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
  160. if err != nil {
  161. return err
  162. }
  163. return nil
  164. }
  165. // syncing failed
  166. logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
  167. managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == maxRetry-1) && (m.State() == stateReady)}
  168. // post-fail hooks
  169. logger.Debug("post-fail hooks")
  170. err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
  171. if err != nil {
  172. return err
  173. }
  174. // gracefully exit
  175. if stopASAP {
  176. logger.Debug("No retry, exit directly")
  177. return nil
  178. }
  179. // continue to next retry
  180. } // for retry
  181. return nil
  182. }
  183. runJob := func(kill <-chan empty, jobDone chan<- empty) {
  184. select {
  185. case semaphore <- empty{}:
  186. defer func() { <-semaphore }()
  187. runJobWrapper(kill, jobDone)
  188. case <-kill:
  189. jobDone <- empty{}
  190. return
  191. }
  192. }
  193. for {
  194. if m.State() == stateReady {
  195. kill := make(chan empty)
  196. jobDone := make(chan empty)
  197. go runJob(kill, jobDone)
  198. _wait_for_job:
  199. select {
  200. case <-jobDone:
  201. logger.Debug("job done")
  202. case ctrl := <-m.ctrlChan:
  203. switch ctrl {
  204. case jobStop:
  205. m.SetState(statePaused)
  206. close(kill)
  207. <-jobDone
  208. case jobDisable:
  209. m.SetState(stateDisabled)
  210. close(kill)
  211. <-jobDone
  212. return nil
  213. case jobRestart:
  214. m.SetState(stateReady)
  215. close(kill)
  216. <-jobDone
  217. continue
  218. case jobStart:
  219. m.SetState(stateReady)
  220. goto _wait_for_job
  221. case jobHalt:
  222. m.SetState(stateHalting)
  223. close(kill)
  224. <-jobDone
  225. return nil
  226. default:
  227. // TODO: implement this
  228. close(kill)
  229. return nil
  230. }
  231. }
  232. }
  233. ctrl := <-m.ctrlChan
  234. switch ctrl {
  235. case jobStop:
  236. m.SetState(statePaused)
  237. case jobDisable:
  238. m.SetState(stateDisabled)
  239. return nil
  240. case jobRestart:
  241. m.SetState(stateReady)
  242. case jobStart:
  243. m.SetState(stateReady)
  244. default:
  245. // TODO
  246. return nil
  247. }
  248. }
  249. }