job.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. tunasync "github.com/tuna/tunasync/internal"
  9. )
  10. // this file contains the workflow of a mirror jb
  11. type ctrlAction uint8
  12. const (
  13. jobStart ctrlAction = iota
  14. jobStop // stop syncing keep the job
  15. jobDisable // disable the job (stops goroutine)
  16. jobRestart // restart syncing
  17. jobPing // ensure the goroutine is alive
  18. jobHalt // worker halts
  19. jobForceStart // ignore concurrent limit
  20. )
  21. type jobMessage struct {
  22. status tunasync.SyncStatus
  23. name string
  24. msg string
  25. schedule bool
  26. }
  27. const (
  28. // empty state
  29. stateNone uint32 = iota
  30. // ready to run, able to schedule
  31. stateReady
  32. // paused by jobStop
  33. statePaused
  34. // disabled by jobDisable
  35. stateDisabled
  36. // worker is halting
  37. stateHalting
  38. )
  39. // use to ensure all jobs are finished before
  40. // worker exit
  41. var jobsDone sync.WaitGroup
  42. type mirrorJob struct {
  43. provider mirrorProvider
  44. ctrlChan chan ctrlAction
  45. disabled chan empty
  46. state uint32
  47. size string
  48. }
  49. func newMirrorJob(provider mirrorProvider) *mirrorJob {
  50. return &mirrorJob{
  51. provider: provider,
  52. ctrlChan: make(chan ctrlAction, 1),
  53. state: stateNone,
  54. }
  55. }
  56. func (m *mirrorJob) Name() string {
  57. return m.provider.Name()
  58. }
  59. func (m *mirrorJob) State() uint32 {
  60. return atomic.LoadUint32(&(m.state))
  61. }
  62. func (m *mirrorJob) SetState(state uint32) {
  63. atomic.StoreUint32(&(m.state), state)
  64. }
  65. func (m *mirrorJob) SetProvider(provider mirrorProvider) error {
  66. s := m.State()
  67. if (s != stateNone) && (s != stateDisabled) {
  68. return fmt.Errorf("Provider cannot be switched when job state is %d", s)
  69. }
  70. m.provider = provider
  71. return nil
  72. }
  73. // runMirrorJob is the goroutine where syncing job runs in
  74. // arguments:
  75. //
  76. // provider: mirror provider object
  77. // ctrlChan: receives messages from the manager
  78. // managerChan: push messages to the manager, this channel should have a larger buffer
  79. // sempaphore: make sure the concurrent running syncing job won't explode
  80. //
  81. // TODO: message struct for managerChan
  82. func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
  83. jobsDone.Add(1)
  84. m.disabled = make(chan empty)
  85. defer func() {
  86. close(m.disabled)
  87. jobsDone.Done()
  88. }()
  89. provider := m.provider
  90. // to make code shorter
  91. runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
  92. for _, hook := range Hooks {
  93. if err := action(hook); err != nil {
  94. logger.Errorf(
  95. "failed at %s hooks for %s: %s",
  96. hookname, m.Name(), err.Error(),
  97. )
  98. managerChan <- jobMessage{
  99. tunasync.Failed, m.Name(),
  100. fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
  101. true,
  102. }
  103. return err
  104. }
  105. }
  106. return nil
  107. }
  108. runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
  109. defer close(jobDone)
  110. managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), "", false}
  111. logger.Noticef("start syncing: %s", m.Name())
  112. Hooks := provider.Hooks()
  113. rHooks := []jobHook{}
  114. for i := len(Hooks); i > 0; i-- {
  115. rHooks = append(rHooks, Hooks[i-1])
  116. }
  117. logger.Debug("hooks: pre-job")
  118. err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
  119. if err != nil {
  120. return err
  121. }
  122. for retry := 0; retry < provider.Retry(); retry++ {
  123. stopASAP := false // stop job as soon as possible
  124. if retry > 0 {
  125. logger.Noticef("retry syncing: %s, retry: %d", m.Name(), retry)
  126. }
  127. err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
  128. if err != nil {
  129. return err
  130. }
  131. // start syncing
  132. managerChan <- jobMessage{tunasync.Syncing, m.Name(), "", false}
  133. var syncErr error
  134. syncDone := make(chan error, 1)
  135. started := make(chan empty, 10) // we may receive "started" more than one time (e.g. two_stage_rsync)
  136. go func() {
  137. err := provider.Run(started)
  138. syncDone <- err
  139. }()
  140. select { // Wait until provider started or error happened
  141. case err := <-syncDone:
  142. logger.Errorf("failed to start provider %s: %s", m.Name(), err.Error())
  143. syncDone <- err // it will be read again later
  144. case <-started:
  145. logger.Debug("provider started")
  146. }
  147. // Now terminating the provider is feasible
  148. var termErr error
  149. timeout := provider.Timeout()
  150. if timeout <= 0 {
  151. timeout = 100000 * time.Hour // never time out
  152. }
  153. select {
  154. case syncErr = <-syncDone:
  155. logger.Debug("syncing done")
  156. case <-time.After(timeout):
  157. logger.Notice("provider timeout")
  158. termErr = provider.Terminate()
  159. syncErr = fmt.Errorf("%s timeout after %v", m.Name(), timeout)
  160. case <-kill:
  161. logger.Debug("received kill")
  162. stopASAP = true
  163. termErr = provider.Terminate()
  164. syncErr = errors.New("killed by manager")
  165. }
  166. if termErr != nil {
  167. logger.Errorf("failed to terminate provider %s: %s", m.Name(), termErr.Error())
  168. return termErr
  169. }
  170. // post-exec hooks
  171. herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
  172. if herr != nil {
  173. return herr
  174. }
  175. if syncErr == nil {
  176. // syncing success
  177. logger.Noticef("succeeded syncing %s", m.Name())
  178. // post-success hooks
  179. logger.Debug("post-success hooks")
  180. err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
  181. if err != nil {
  182. return err
  183. }
  184. } else {
  185. // syncing failed
  186. logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
  187. // post-fail hooks
  188. logger.Debug("post-fail hooks")
  189. err := runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
  190. if err != nil {
  191. return err
  192. }
  193. }
  194. if syncErr == nil {
  195. // syncing success
  196. m.size = provider.DataSize()
  197. managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)}
  198. return nil
  199. }
  200. // syncing failed
  201. managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == provider.Retry()-1) && (m.State() == stateReady)}
  202. // gracefully exit
  203. if stopASAP {
  204. logger.Debug("No retry, exit directly")
  205. return nil
  206. }
  207. // continue to next retry
  208. } // for retry
  209. return nil
  210. }
  211. runJob := func(kill <-chan empty, jobDone chan<- empty, bypassSemaphore <-chan empty) {
  212. select {
  213. case semaphore <- empty{}:
  214. defer func() { <-semaphore }()
  215. runJobWrapper(kill, jobDone)
  216. case <-bypassSemaphore:
  217. logger.Noticef("Concurrent limit ignored by %s", m.Name())
  218. runJobWrapper(kill, jobDone)
  219. case <-kill:
  220. jobDone <- empty{}
  221. return
  222. }
  223. }
  224. bypassSemaphore := make(chan empty, 1)
  225. for {
  226. if m.State() == stateReady {
  227. kill := make(chan empty)
  228. jobDone := make(chan empty)
  229. go runJob(kill, jobDone, bypassSemaphore)
  230. _wait_for_job:
  231. select {
  232. case <-jobDone:
  233. logger.Debug("job done")
  234. case ctrl := <-m.ctrlChan:
  235. switch ctrl {
  236. case jobStop:
  237. m.SetState(statePaused)
  238. close(kill)
  239. <-jobDone
  240. case jobDisable:
  241. m.SetState(stateDisabled)
  242. close(kill)
  243. <-jobDone
  244. return nil
  245. case jobRestart:
  246. m.SetState(stateReady)
  247. close(kill)
  248. <-jobDone
  249. time.Sleep(time.Second) // Restart may fail if the process was not exited yet
  250. continue
  251. case jobForceStart:
  252. select { //non-blocking
  253. default:
  254. case bypassSemaphore <- empty{}:
  255. }
  256. fallthrough
  257. case jobStart:
  258. m.SetState(stateReady)
  259. goto _wait_for_job
  260. case jobHalt:
  261. m.SetState(stateHalting)
  262. close(kill)
  263. <-jobDone
  264. return nil
  265. default:
  266. // TODO: implement this
  267. close(kill)
  268. return nil
  269. }
  270. }
  271. }
  272. ctrl := <-m.ctrlChan
  273. switch ctrl {
  274. case jobStop:
  275. m.SetState(statePaused)
  276. case jobDisable:
  277. m.SetState(stateDisabled)
  278. return nil
  279. case jobForceStart:
  280. select { //non-blocking
  281. default:
  282. case bypassSemaphore <- empty{}:
  283. }
  284. fallthrough
  285. case jobRestart:
  286. fallthrough
  287. case jobStart:
  288. m.SetState(stateReady)
  289. default:
  290. // TODO
  291. return nil
  292. }
  293. }
  294. }