2
0

job.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. tunasync "github.com/tuna/tunasync/internal"
  6. )
  7. // this file contains the workflow of a mirror jb
  8. type ctrlAction uint8
  9. const (
  10. jobStart ctrlAction = iota
  11. jobStop // stop syncing keep the job
  12. jobDisable // disable the job (stops goroutine)
  13. jobRestart // restart syncing
  14. jobPing // ensure the goroutine is alive
  15. )
  16. type jobMessage struct {
  17. status tunasync.SyncStatus
  18. name string
  19. msg string
  20. }
  21. type mirrorJob struct {
  22. provider mirrorProvider
  23. ctrlChan chan ctrlAction
  24. disabled chan empty
  25. enabled bool
  26. }
  27. func newMirrorJob(provider mirrorProvider) *mirrorJob {
  28. return &mirrorJob{
  29. provider: provider,
  30. ctrlChan: make(chan ctrlAction, 1),
  31. enabled: false,
  32. }
  33. }
  34. func (m *mirrorJob) Name() string {
  35. return m.provider.Name()
  36. }
  37. func (m *mirrorJob) Disabled() bool {
  38. if !m.enabled {
  39. return true
  40. }
  41. select {
  42. case <-m.disabled:
  43. return true
  44. default:
  45. return false
  46. }
  47. }
  48. // runMirrorJob is the goroutine where syncing job runs in
  49. // arguments:
  50. // provider: mirror provider object
  51. // ctrlChan: receives messages from the manager
  52. // managerChan: push messages to the manager, this channel should have a larger buffer
  53. // sempaphore: make sure the concurrent running syncing job won't explode
  54. // TODO: message struct for managerChan
  55. func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
  56. m.disabled = make(chan empty)
  57. defer close(m.disabled)
  58. provider := m.provider
  59. // to make code shorter
  60. runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
  61. for _, hook := range Hooks {
  62. if err := action(hook); err != nil {
  63. logger.Error(
  64. "failed at %s hooks for %s: %s",
  65. hookname, m.Name(), err.Error(),
  66. )
  67. managerChan <- jobMessage{
  68. tunasync.Failed, m.Name(),
  69. fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
  70. }
  71. return err
  72. }
  73. }
  74. return nil
  75. }
  76. runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
  77. defer close(jobDone)
  78. managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), ""}
  79. logger.Info("start syncing: %s", m.Name())
  80. Hooks := provider.Hooks()
  81. rHooks := []jobHook{}
  82. for i := len(Hooks); i > 0; i-- {
  83. rHooks = append(rHooks, Hooks[i-1])
  84. }
  85. logger.Debug("hooks: pre-job")
  86. err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
  87. if err != nil {
  88. return err
  89. }
  90. for retry := 0; retry < maxRetry; retry++ {
  91. stopASAP := false // stop job as soon as possible
  92. if retry > 0 {
  93. logger.Info("retry syncing: %s, retry: %d", m.Name(), retry)
  94. }
  95. err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
  96. if err != nil {
  97. return err
  98. }
  99. // start syncing
  100. managerChan <- jobMessage{tunasync.Syncing, m.Name(), ""}
  101. var syncErr error
  102. syncDone := make(chan error, 1)
  103. go func() {
  104. err := provider.Run()
  105. if !stopASAP {
  106. syncDone <- err
  107. }
  108. }()
  109. select {
  110. case syncErr = <-syncDone:
  111. logger.Debug("syncing done")
  112. case <-kill:
  113. logger.Debug("received kill")
  114. stopASAP = true
  115. err := provider.Terminate()
  116. if err != nil {
  117. logger.Error("failed to terminate provider %s: %s", m.Name(), err.Error())
  118. return err
  119. }
  120. syncErr = errors.New("killed by manager")
  121. }
  122. // post-exec hooks
  123. herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
  124. if herr != nil {
  125. return herr
  126. }
  127. if syncErr == nil {
  128. // syncing success
  129. logger.Info("succeeded syncing %s", m.Name())
  130. managerChan <- jobMessage{tunasync.Success, m.Name(), ""}
  131. // post-success hooks
  132. err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
  133. if err != nil {
  134. return err
  135. }
  136. return nil
  137. }
  138. // syncing failed
  139. logger.Warning("failed syncing %s: %s", m.Name(), syncErr.Error())
  140. managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error()}
  141. // post-fail hooks
  142. logger.Debug("post-fail hooks")
  143. err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
  144. if err != nil {
  145. return err
  146. }
  147. // gracefully exit
  148. if stopASAP {
  149. logger.Debug("No retry, exit directly")
  150. return nil
  151. }
  152. // continue to next retry
  153. } // for retry
  154. return nil
  155. }
  156. runJob := func(kill <-chan empty, jobDone chan<- empty) {
  157. select {
  158. case semaphore <- empty{}:
  159. defer func() { <-semaphore }()
  160. runJobWrapper(kill, jobDone)
  161. case <-kill:
  162. jobDone <- empty{}
  163. return
  164. }
  165. }
  166. for {
  167. if m.enabled {
  168. kill := make(chan empty)
  169. jobDone := make(chan empty)
  170. go runJob(kill, jobDone)
  171. _wait_for_job:
  172. select {
  173. case <-jobDone:
  174. logger.Debug("job done")
  175. case ctrl := <-m.ctrlChan:
  176. switch ctrl {
  177. case jobStop:
  178. m.enabled = false
  179. close(kill)
  180. <-jobDone
  181. case jobDisable:
  182. m.enabled = false
  183. close(kill)
  184. <-jobDone
  185. return nil
  186. case jobRestart:
  187. m.enabled = true
  188. close(kill)
  189. <-jobDone
  190. continue
  191. case jobStart:
  192. m.enabled = true
  193. goto _wait_for_job
  194. default:
  195. // TODO: implement this
  196. close(kill)
  197. return nil
  198. }
  199. }
  200. }
  201. ctrl := <-m.ctrlChan
  202. switch ctrl {
  203. case jobStop:
  204. m.enabled = false
  205. case jobDisable:
  206. m.enabled = false
  207. return nil
  208. case jobRestart:
  209. m.enabled = true
  210. case jobStart:
  211. m.enabled = true
  212. default:
  213. // TODO
  214. return nil
  215. }
  216. }
  217. }