job.go 6.4 KB


  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. tunasync "github.com/tuna/tunasync/internal"
  9. )
  10. // this file contains the workflow of a mirror jb
  11. type ctrlAction uint8
  12. const (
  13. jobStart ctrlAction = iota
  14. jobStop // stop syncing keep the job
  15. jobDisable // disable the job (stops goroutine)
  16. jobRestart // restart syncing
  17. jobPing // ensure the goroutine is alive
  18. jobHalt // worker halts
  19. )
  20. type jobMessage struct {
  21. status tunasync.SyncStatus
  22. name string
  23. msg string
  24. schedule bool
  25. }
  26. const (
  27. // empty state
  28. stateNone uint32 = iota
  29. // ready to run, able to schedule
  30. stateReady
  31. // paused by jobStop
  32. statePaused
  33. // disabled by jobDisable
  34. stateDisabled
  35. // worker is halting
  36. stateHalting
  37. )
  38. // use to ensure all jobs are finished before
  39. // worker exit
  40. var jobsDone sync.WaitGroup
  41. type mirrorJob struct {
  42. provider mirrorProvider
  43. ctrlChan chan ctrlAction
  44. disabled chan empty
  45. state uint32
  46. }
  47. func newMirrorJob(provider mirrorProvider) *mirrorJob {
  48. return &mirrorJob{
  49. provider: provider,
  50. ctrlChan: make(chan ctrlAction, 1),
  51. state: stateNone,
  52. }
  53. }
  54. func (m *mirrorJob) Name() string {
  55. return m.provider.Name()
  56. }
  57. func (m *mirrorJob) State() uint32 {
  58. return atomic.LoadUint32(&(m.state))
  59. }
  60. func (m *mirrorJob) SetState(state uint32) {
  61. atomic.StoreUint32(&(m.state), state)
  62. }
  63. func (m *mirrorJob) SetProvider(provider mirrorProvider) error {
  64. s := m.State()
  65. if (s != stateNone) && (s != stateDisabled) {
  66. return fmt.Errorf("Provider cannot be switched when job state is %d", s)
  67. }
  68. m.provider = provider
  69. return nil
  70. }
  71. // runMirrorJob is the goroutine where syncing job runs in
  72. // arguments:
  73. // provider: mirror provider object
  74. // ctrlChan: receives messages from the manager
  75. // managerChan: push messages to the manager, this channel should have a larger buffer
  76. // sempaphore: make sure the concurrent running syncing job won't explode
  77. // TODO: message struct for managerChan
  78. func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
  79. jobsDone.Add(1)
  80. m.disabled = make(chan empty)
  81. defer func() {
  82. close(m.disabled)
  83. jobsDone.Done()
  84. }()
  85. provider := m.provider
  86. // to make code shorter
  87. runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
  88. for _, hook := range Hooks {
  89. if err := action(hook); err != nil {
  90. logger.Errorf(
  91. "failed at %s hooks for %s: %s",
  92. hookname, m.Name(), err.Error(),
  93. )
  94. managerChan <- jobMessage{
  95. tunasync.Failed, m.Name(),
  96. fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
  97. false,
  98. }
  99. return err
  100. }
  101. }
  102. return nil
  103. }
  104. runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
  105. defer close(jobDone)
  106. managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), "", false}
  107. logger.Noticef("start syncing: %s", m.Name())
  108. Hooks := provider.Hooks()
  109. rHooks := []jobHook{}
  110. for i := len(Hooks); i > 0; i-- {
  111. rHooks = append(rHooks, Hooks[i-1])
  112. }
  113. logger.Debug("hooks: pre-job")
  114. err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
  115. if err != nil {
  116. return err
  117. }
  118. for retry := 0; retry < maxRetry; retry++ {
  119. stopASAP := false // stop job as soon as possible
  120. if retry > 0 {
  121. logger.Noticef("retry syncing: %s, retry: %d", m.Name(), retry)
  122. }
  123. err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
  124. if err != nil {
  125. return err
  126. }
  127. // start syncing
  128. managerChan <- jobMessage{tunasync.Syncing, m.Name(), "", false}
  129. var syncErr error
  130. syncDone := make(chan error, 1)
  131. go func() {
  132. err := provider.Run()
  133. syncDone <- err
  134. }()
  135. select {
  136. case syncErr = <-syncDone:
  137. logger.Debug("syncing done")
  138. case <-kill:
  139. logger.Debug("received kill")
  140. stopASAP = true
  141. err := provider.Terminate()
  142. if err != nil {
  143. logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error())
  144. return err
  145. }
  146. syncErr = errors.New("killed by manager")
  147. }
  148. // post-exec hooks
  149. herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
  150. if herr != nil {
  151. return herr
  152. }
  153. if syncErr == nil {
  154. // syncing success
  155. logger.Noticef("succeeded syncing %s", m.Name())
  156. managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)}
  157. // post-success hooks
  158. err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
  159. if err != nil {
  160. return err
  161. }
  162. return nil
  163. }
  164. // syncing failed
  165. logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
  166. managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == maxRetry-1) && (m.State() == stateReady)}
  167. // post-fail hooks
  168. logger.Debug("post-fail hooks")
  169. err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
  170. if err != nil {
  171. return err
  172. }
  173. // gracefully exit
  174. if stopASAP {
  175. logger.Debug("No retry, exit directly")
  176. return nil
  177. }
  178. // continue to next retry
  179. } // for retry
  180. return nil
  181. }
  182. runJob := func(kill <-chan empty, jobDone chan<- empty) {
  183. select {
  184. case semaphore <- empty{}:
  185. defer func() { <-semaphore }()
  186. runJobWrapper(kill, jobDone)
  187. case <-kill:
  188. jobDone <- empty{}
  189. return
  190. }
  191. }
  192. for {
  193. if m.State() == stateReady {
  194. kill := make(chan empty)
  195. jobDone := make(chan empty)
  196. go runJob(kill, jobDone)
  197. _wait_for_job:
  198. select {
  199. case <-jobDone:
  200. logger.Debug("job done")
  201. case ctrl := <-m.ctrlChan:
  202. switch ctrl {
  203. case jobStop:
  204. m.SetState(statePaused)
  205. close(kill)
  206. <-jobDone
  207. case jobDisable:
  208. m.SetState(stateDisabled)
  209. close(kill)
  210. <-jobDone
  211. return nil
  212. case jobRestart:
  213. m.SetState(stateReady)
  214. close(kill)
  215. <-jobDone
  216. time.Sleep(time.Second) // Restart may fail if the process was not exited yet
  217. continue
  218. case jobStart:
  219. m.SetState(stateReady)
  220. goto _wait_for_job
  221. case jobHalt:
  222. m.SetState(stateHalting)
  223. close(kill)
  224. <-jobDone
  225. return nil
  226. default:
  227. // TODO: implement this
  228. close(kill)
  229. return nil
  230. }
  231. }
  232. }
  233. ctrl := <-m.ctrlChan
  234. switch ctrl {
  235. case jobStop:
  236. m.SetState(statePaused)
  237. case jobDisable:
  238. m.SetState(stateDisabled)
  239. return nil
  240. case jobRestart:
  241. m.SetState(stateReady)
  242. case jobStart:
  243. m.SetState(stateReady)
  244. default:
  245. // TODO
  246. return nil
  247. }
  248. }
  249. }