2
0

job.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. tunasync "github.com/tuna/tunasync/internal"
  9. )
  10. // this file contains the workflow of a mirror jb
  11. type ctrlAction uint8
  12. const (
  13. jobStart ctrlAction = iota
  14. jobStop // stop syncing keep the job
  15. jobDisable // disable the job (stops goroutine)
  16. jobRestart // restart syncing
  17. jobPing // ensure the goroutine is alive
  18. jobHalt // worker halts
  19. jobForceStart // ignore concurrent limit
  20. )
  21. type jobMessage struct {
  22. status tunasync.SyncStatus
  23. name string
  24. msg string
  25. schedule bool
  26. }
  27. const (
  28. // empty state
  29. stateNone uint32 = iota
  30. // ready to run, able to schedule
  31. stateReady
  32. // paused by jobStop
  33. statePaused
  34. // disabled by jobDisable
  35. stateDisabled
  36. // worker is halting
  37. stateHalting
  38. )
  39. // use to ensure all jobs are finished before
  40. // worker exit
  41. var jobsDone sync.WaitGroup
  42. type mirrorJob struct {
  43. provider mirrorProvider
  44. ctrlChan chan ctrlAction
  45. disabled chan empty
  46. state uint32
  47. }
  48. func newMirrorJob(provider mirrorProvider) *mirrorJob {
  49. return &mirrorJob{
  50. provider: provider,
  51. ctrlChan: make(chan ctrlAction, 1),
  52. state: stateNone,
  53. }
  54. }
  55. func (m *mirrorJob) Name() string {
  56. return m.provider.Name()
  57. }
  58. func (m *mirrorJob) State() uint32 {
  59. return atomic.LoadUint32(&(m.state))
  60. }
  61. func (m *mirrorJob) SetState(state uint32) {
  62. atomic.StoreUint32(&(m.state), state)
  63. }
  64. func (m *mirrorJob) SetProvider(provider mirrorProvider) error {
  65. s := m.State()
  66. if (s != stateNone) && (s != stateDisabled) {
  67. return fmt.Errorf("Provider cannot be switched when job state is %d", s)
  68. }
  69. m.provider = provider
  70. return nil
  71. }
  72. // runMirrorJob is the goroutine where syncing job runs in
  73. // arguments:
  74. // provider: mirror provider object
  75. // ctrlChan: receives messages from the manager
  76. // managerChan: push messages to the manager, this channel should have a larger buffer
  77. // sempaphore: make sure the concurrent running syncing job won't explode
  78. // TODO: message struct for managerChan
  79. func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
  80. jobsDone.Add(1)
  81. m.disabled = make(chan empty)
  82. defer func() {
  83. close(m.disabled)
  84. jobsDone.Done()
  85. }()
  86. provider := m.provider
  87. // to make code shorter
  88. runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
  89. for _, hook := range Hooks {
  90. if err := action(hook); err != nil {
  91. logger.Errorf(
  92. "failed at %s hooks for %s: %s",
  93. hookname, m.Name(), err.Error(),
  94. )
  95. managerChan <- jobMessage{
  96. tunasync.Failed, m.Name(),
  97. fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
  98. false,
  99. }
  100. return err
  101. }
  102. }
  103. return nil
  104. }
  105. runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
  106. defer close(jobDone)
  107. managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), "", false}
  108. logger.Noticef("start syncing: %s", m.Name())
  109. Hooks := provider.Hooks()
  110. rHooks := []jobHook{}
  111. for i := len(Hooks); i > 0; i-- {
  112. rHooks = append(rHooks, Hooks[i-1])
  113. }
  114. logger.Debug("hooks: pre-job")
  115. err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
  116. if err != nil {
  117. return err
  118. }
  119. for retry := 0; retry < maxRetry; retry++ {
  120. stopASAP := false // stop job as soon as possible
  121. if retry > 0 {
  122. logger.Noticef("retry syncing: %s, retry: %d", m.Name(), retry)
  123. }
  124. err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
  125. if err != nil {
  126. return err
  127. }
  128. // start syncing
  129. managerChan <- jobMessage{tunasync.Syncing, m.Name(), "", false}
  130. var syncErr error
  131. syncDone := make(chan error, 1)
  132. go func() {
  133. err := provider.Run()
  134. syncDone <- err
  135. }()
  136. select {
  137. case syncErr = <-syncDone:
  138. logger.Debug("syncing done")
  139. case <-kill:
  140. logger.Debug("received kill")
  141. stopASAP = true
  142. err := provider.Terminate()
  143. if err != nil {
  144. logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error())
  145. return err
  146. }
  147. syncErr = errors.New("killed by manager")
  148. }
  149. // post-exec hooks
  150. herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
  151. if herr != nil {
  152. return herr
  153. }
  154. if syncErr == nil {
  155. // syncing success
  156. logger.Noticef("succeeded syncing %s", m.Name())
  157. managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)}
  158. // post-success hooks
  159. err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
  160. if err != nil {
  161. return err
  162. }
  163. return nil
  164. }
  165. // syncing failed
  166. logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
  167. managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == maxRetry-1) && (m.State() == stateReady)}
  168. // post-fail hooks
  169. logger.Debug("post-fail hooks")
  170. err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
  171. if err != nil {
  172. return err
  173. }
  174. // gracefully exit
  175. if stopASAP {
  176. logger.Debug("No retry, exit directly")
  177. return nil
  178. }
  179. // continue to next retry
  180. } // for retry
  181. return nil
  182. }
  183. runJob := func(kill <-chan empty, jobDone chan<- empty, bypassSemaphore <-chan empty) {
  184. select {
  185. case semaphore <- empty{}:
  186. defer func() { <-semaphore }()
  187. runJobWrapper(kill, jobDone)
  188. case <-bypassSemaphore:
  189. logger.Noticef("Concurrent limit ignored by %s", m.Name())
  190. runJobWrapper(kill, jobDone)
  191. case <-kill:
  192. jobDone <- empty{}
  193. return
  194. }
  195. }
  196. bypassSemaphore := make(chan empty, 1)
  197. for {
  198. if m.State() == stateReady {
  199. kill := make(chan empty)
  200. jobDone := make(chan empty)
  201. go runJob(kill, jobDone, bypassSemaphore)
  202. _wait_for_job:
  203. select {
  204. case <-jobDone:
  205. logger.Debug("job done")
  206. case ctrl := <-m.ctrlChan:
  207. switch ctrl {
  208. case jobStop:
  209. m.SetState(statePaused)
  210. close(kill)
  211. <-jobDone
  212. case jobDisable:
  213. m.SetState(stateDisabled)
  214. close(kill)
  215. <-jobDone
  216. return nil
  217. case jobRestart:
  218. m.SetState(stateReady)
  219. close(kill)
  220. <-jobDone
  221. time.Sleep(time.Second) // Restart may fail if the process was not exited yet
  222. continue
  223. case jobForceStart:
  224. select { //non-blocking
  225. default:
  226. case bypassSemaphore <- empty{}:
  227. }
  228. fallthrough
  229. case jobStart:
  230. m.SetState(stateReady)
  231. goto _wait_for_job
  232. case jobHalt:
  233. m.SetState(stateHalting)
  234. close(kill)
  235. <-jobDone
  236. return nil
  237. default:
  238. // TODO: implement this
  239. close(kill)
  240. return nil
  241. }
  242. }
  243. }
  244. ctrl := <-m.ctrlChan
  245. switch ctrl {
  246. case jobStop:
  247. m.SetState(statePaused)
  248. case jobDisable:
  249. m.SetState(stateDisabled)
  250. return nil
  251. case jobForceStart:
  252. select { //non-blocking
  253. default:
  254. case bypassSemaphore <- empty{}:
  255. }
  256. fallthrough
  257. case jobRestart:
  258. fallthrough
  259. case jobStart:
  260. m.SetState(stateReady)
  261. default:
  262. // TODO
  263. return nil
  264. }
  265. }
  266. }