job.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. tunasync "github.com/tuna/tunasync/internal"
  6. )
  7. // this file contains the workflow of a mirror jb
  8. type ctrlAction uint8
  9. const (
  10. jobStart ctrlAction = iota
  11. jobStop // stop syncing keep the job
  12. jobDisable // disable the job (stops goroutine)
  13. jobRestart // restart syncing
  14. jobPing // ensure the goroutine is alive
  15. )
  16. type jobMessage struct {
  17. status tunasync.SyncStatus
  18. name string
  19. msg string
  20. }
  21. type mirrorJob struct {
  22. provider mirrorProvider
  23. ctrlChan chan ctrlAction
  24. enabled bool
  25. }
  26. func newMirrorJob(provider mirrorProvider) *mirrorJob {
  27. return &mirrorJob{
  28. provider: provider,
  29. ctrlChan: make(chan ctrlAction, 1),
  30. enabled: true,
  31. }
  32. }
  33. func (m *mirrorJob) Name() string {
  34. return m.provider.Name()
  35. }
  36. // runMirrorJob is the goroutine where syncing job runs in
  37. // arguments:
  38. // provider: mirror provider object
  39. // ctrlChan: receives messages from the manager
  40. // managerChan: push messages to the manager, this channel should have a larger buffer
  41. // sempaphore: make sure the concurrent running syncing job won't explode
  42. // TODO: message struct for managerChan
  43. func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
  44. provider := m.provider
  45. // to make code shorter
  46. runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
  47. for _, hook := range Hooks {
  48. if err := action(hook); err != nil {
  49. logger.Error(
  50. "failed at %s hooks for %s: %s",
  51. hookname, m.Name(), err.Error(),
  52. )
  53. managerChan <- jobMessage{
  54. tunasync.Failed, m.Name(),
  55. fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
  56. }
  57. return err
  58. }
  59. }
  60. return nil
  61. }
  62. runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
  63. defer close(jobDone)
  64. managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), ""}
  65. logger.Info("start syncing: %s", m.Name())
  66. Hooks := provider.Hooks()
  67. rHooks := []jobHook{}
  68. for i := len(Hooks); i > 0; i-- {
  69. rHooks = append(rHooks, Hooks[i-1])
  70. }
  71. logger.Debug("hooks: pre-job")
  72. err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
  73. if err != nil {
  74. return err
  75. }
  76. for retry := 0; retry < maxRetry; retry++ {
  77. stopASAP := false // stop job as soon as possible
  78. if retry > 0 {
  79. logger.Info("retry syncing: %s, retry: %d", m.Name(), retry)
  80. }
  81. err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
  82. if err != nil {
  83. return err
  84. }
  85. // start syncing
  86. managerChan <- jobMessage{tunasync.Syncing, m.Name(), ""}
  87. var syncErr error
  88. syncDone := make(chan error, 1)
  89. go func() {
  90. err := provider.Run()
  91. if !stopASAP {
  92. syncDone <- err
  93. }
  94. }()
  95. select {
  96. case syncErr = <-syncDone:
  97. logger.Debug("syncing done")
  98. case <-kill:
  99. logger.Debug("received kill")
  100. stopASAP = true
  101. err := provider.Terminate()
  102. if err != nil {
  103. logger.Error("failed to terminate provider %s: %s", m.Name(), err.Error())
  104. return err
  105. }
  106. syncErr = errors.New("killed by manager")
  107. }
  108. // post-exec hooks
  109. herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
  110. if herr != nil {
  111. return herr
  112. }
  113. if syncErr == nil {
  114. // syncing success
  115. logger.Info("succeeded syncing %s", m.Name())
  116. managerChan <- jobMessage{tunasync.Success, m.Name(), ""}
  117. // post-success hooks
  118. err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
  119. if err != nil {
  120. return err
  121. }
  122. return nil
  123. }
  124. // syncing failed
  125. logger.Warning("failed syncing %s: %s", m.Name(), syncErr.Error())
  126. managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error()}
  127. // post-fail hooks
  128. logger.Debug("post-fail hooks")
  129. err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
  130. if err != nil {
  131. return err
  132. }
  133. // gracefully exit
  134. if stopASAP {
  135. logger.Debug("No retry, exit directly")
  136. return nil
  137. }
  138. // continue to next retry
  139. } // for retry
  140. return nil
  141. }
  142. runJob := func(kill <-chan empty, jobDone chan<- empty) {
  143. select {
  144. case semaphore <- empty{}:
  145. defer func() { <-semaphore }()
  146. runJobWrapper(kill, jobDone)
  147. case <-kill:
  148. jobDone <- empty{}
  149. return
  150. }
  151. }
  152. for {
  153. if m.enabled {
  154. kill := make(chan empty)
  155. jobDone := make(chan empty)
  156. go runJob(kill, jobDone)
  157. _wait_for_job:
  158. select {
  159. case <-jobDone:
  160. logger.Debug("job done")
  161. case ctrl := <-m.ctrlChan:
  162. switch ctrl {
  163. case jobStop:
  164. m.enabled = false
  165. close(kill)
  166. <-jobDone
  167. case jobDisable:
  168. close(kill)
  169. <-jobDone
  170. return nil
  171. case jobRestart:
  172. m.enabled = true
  173. close(kill)
  174. <-jobDone
  175. continue
  176. case jobStart:
  177. m.enabled = true
  178. goto _wait_for_job
  179. default:
  180. // TODO: implement this
  181. close(kill)
  182. return nil
  183. }
  184. }
  185. }
  186. ctrl := <-m.ctrlChan
  187. switch ctrl {
  188. case jobStop:
  189. m.enabled = false
  190. case jobDisable:
  191. return nil
  192. case jobRestart:
  193. m.enabled = true
  194. case jobStart:
  195. m.enabled = true
  196. default:
  197. // TODO
  198. return nil
  199. }
  200. }
  201. }