job.go 5.1 KB


  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. tunasync "github.com/tuna/tunasync/internal"
  6. )
  7. // this file contains the workflow of a mirror jb
  8. type ctrlAction uint8
  9. const (
  10. jobStart ctrlAction = iota
  11. jobStop // stop syncing keep the job
  12. jobDisable // disable the job (stops goroutine)
  13. jobRestart // restart syncing
  14. jobPing // ensure the goroutine is alive
  15. )
  16. type jobMessage struct {
  17. status tunasync.SyncStatus
  18. name string
  19. msg string
  20. }
  21. // runMirrorJob is the goroutine where syncing job runs in
  22. // arguments:
  23. // provider: mirror provider object
  24. // ctrlChan: receives messages from the manager
  25. // managerChan: push messages to the manager, this channel should have a larger buffer
  26. // sempaphore: make sure the concurrent running syncing job won't explode
  27. // TODO: message struct for managerChan
  28. func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerChan chan<- jobMessage, semaphore chan empty) error {
  29. // to make code shorter
  30. runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
  31. for _, hook := range Hooks {
  32. if err := action(hook); err != nil {
  33. logger.Error(
  34. "failed at %s hooks for %s: %s",
  35. hookname, provider.Name(), err.Error(),
  36. )
  37. managerChan <- jobMessage{
  38. tunasync.Failed, provider.Name(),
  39. fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
  40. }
  41. return err
  42. }
  43. }
  44. return nil
  45. }
  46. runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
  47. defer close(jobDone)
  48. managerChan <- jobMessage{tunasync.PreSyncing, provider.Name(), ""}
  49. logger.Info("start syncing: %s", provider.Name())
  50. Hooks := provider.Hooks()
  51. rHooks := []jobHook{}
  52. for i := len(Hooks); i > 0; i-- {
  53. rHooks = append(rHooks, Hooks[i-1])
  54. }
  55. logger.Debug("hooks: pre-job")
  56. err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
  57. if err != nil {
  58. return err
  59. }
  60. for retry := 0; retry < maxRetry; retry++ {
  61. stopASAP := false // stop job as soon as possible
  62. if retry > 0 {
  63. logger.Info("retry syncing: %s, retry: %d", provider.Name(), retry)
  64. }
  65. err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
  66. if err != nil {
  67. return err
  68. }
  69. // start syncing
  70. managerChan <- jobMessage{tunasync.Syncing, provider.Name(), ""}
  71. err = provider.Start()
  72. if err != nil {
  73. logger.Error(
  74. "failed to start syncing job for %s: %s",
  75. provider.Name(), err.Error(),
  76. )
  77. return err
  78. }
  79. var syncErr error
  80. syncDone := make(chan error, 1)
  81. go func() {
  82. err := provider.Wait()
  83. if !stopASAP {
  84. syncDone <- err
  85. }
  86. }()
  87. select {
  88. case syncErr = <-syncDone:
  89. logger.Debug("syncing done")
  90. case <-kill:
  91. logger.Debug("received kill")
  92. stopASAP = true
  93. err := provider.Terminate()
  94. if err != nil {
  95. logger.Error("failed to terminate provider %s: %s", provider.Name(), err.Error())
  96. return err
  97. }
  98. syncErr = errors.New("killed by manager")
  99. }
  100. // post-exec hooks
  101. herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
  102. if herr != nil {
  103. return herr
  104. }
  105. if syncErr == nil {
  106. // syncing success
  107. logger.Info("succeeded syncing %s", provider.Name())
  108. managerChan <- jobMessage{tunasync.Success, provider.Name(), ""}
  109. // post-success hooks
  110. err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
  111. if err != nil {
  112. return err
  113. }
  114. return nil
  115. }
  116. // syncing failed
  117. logger.Warning("failed syncing %s: %s", provider.Name(), syncErr.Error())
  118. managerChan <- jobMessage{tunasync.Failed, provider.Name(), syncErr.Error()}
  119. // post-fail hooks
  120. logger.Debug("post-fail hooks")
  121. err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
  122. if err != nil {
  123. return err
  124. }
  125. // gracefully exit
  126. if stopASAP {
  127. logger.Debug("No retry, exit directly")
  128. return nil
  129. }
  130. // continue to next retry
  131. } // for retry
  132. return nil
  133. }
  134. runJob := func(kill <-chan empty, jobDone chan<- empty) {
  135. select {
  136. case semaphore <- empty{}:
  137. defer func() { <-semaphore }()
  138. runJobWrapper(kill, jobDone)
  139. case <-kill:
  140. jobDone <- empty{}
  141. return
  142. }
  143. }
  144. enabled := true // whether this job is stopped by the manager
  145. for {
  146. if enabled {
  147. kill := make(chan empty)
  148. jobDone := make(chan empty)
  149. go runJob(kill, jobDone)
  150. _wait_for_job:
  151. select {
  152. case <-jobDone:
  153. logger.Debug("job done")
  154. case ctrl := <-ctrlChan:
  155. switch ctrl {
  156. case jobStop:
  157. enabled = false
  158. close(kill)
  159. <-jobDone
  160. case jobDisable:
  161. close(kill)
  162. <-jobDone
  163. return nil
  164. case jobRestart:
  165. enabled = true
  166. close(kill)
  167. <-jobDone
  168. continue
  169. case jobStart:
  170. enabled = true
  171. goto _wait_for_job
  172. default:
  173. // TODO: implement this
  174. close(kill)
  175. return nil
  176. }
  177. }
  178. }
  179. ctrl := <-ctrlChan
  180. switch ctrl {
  181. case jobStop:
  182. enabled = false
  183. case jobDisable:
  184. return nil
  185. case jobRestart:
  186. enabled = true
  187. case jobStart:
  188. enabled = true
  189. default:
  190. // TODO
  191. return nil
  192. }
  193. }
  194. }