123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319 |
- package worker
- import (
- "errors"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- tunasync "github.com/tuna/tunasync/internal"
- )
- // this file contains the workflow of a mirror jb
- type ctrlAction uint8
- const (
- jobStart ctrlAction = iota
- jobStop // stop syncing keep the job
- jobDisable // disable the job (stops goroutine)
- jobRestart // restart syncing
- jobPing // ensure the goroutine is alive
- jobHalt // worker halts
- jobForceStart // ignore concurrent limit
- )
- type jobMessage struct {
- status tunasync.SyncStatus
- name string
- msg string
- schedule bool
- }
- const (
- // empty state
- stateNone uint32 = iota
- // ready to run, able to schedule
- stateReady
- // paused by jobStop
- statePaused
- // disabled by jobDisable
- stateDisabled
- // worker is halting
- stateHalting
- )
- // use to ensure all jobs are finished before
- // worker exit
- var jobsDone sync.WaitGroup
- type mirrorJob struct {
- provider mirrorProvider
- ctrlChan chan ctrlAction
- disabled chan empty
- state uint32
- size string
- }
- func newMirrorJob(provider mirrorProvider) *mirrorJob {
- return &mirrorJob{
- provider: provider,
- ctrlChan: make(chan ctrlAction, 1),
- state: stateNone,
- }
- }
- func (m *mirrorJob) Name() string {
- return m.provider.Name()
- }
- func (m *mirrorJob) State() uint32 {
- return atomic.LoadUint32(&(m.state))
- }
- func (m *mirrorJob) SetState(state uint32) {
- atomic.StoreUint32(&(m.state), state)
- }
- func (m *mirrorJob) SetProvider(provider mirrorProvider) error {
- s := m.State()
- if (s != stateNone) && (s != stateDisabled) {
- return fmt.Errorf("Provider cannot be switched when job state is %d", s)
- }
- m.provider = provider
- return nil
- }
- // runMirrorJob is the goroutine where syncing job runs in
- // arguments:
- // provider: mirror provider object
- // ctrlChan: receives messages from the manager
- // managerChan: push messages to the manager, this channel should have a larger buffer
- // sempaphore: make sure the concurrent running syncing job won't explode
- // TODO: message struct for managerChan
- func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
- jobsDone.Add(1)
- m.disabled = make(chan empty)
- defer func() {
- close(m.disabled)
- jobsDone.Done()
- }()
- provider := m.provider
- // to make code shorter
- runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error {
- for _, hook := range Hooks {
- if err := action(hook); err != nil {
- logger.Errorf(
- "failed at %s hooks for %s: %s",
- hookname, m.Name(), err.Error(),
- )
- managerChan <- jobMessage{
- tunasync.Failed, m.Name(),
- fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
- true,
- }
- return err
- }
- }
- return nil
- }
- runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error {
- defer close(jobDone)
- managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), "", false}
- logger.Noticef("start syncing: %s", m.Name())
- Hooks := provider.Hooks()
- rHooks := []jobHook{}
- for i := len(Hooks); i > 0; i-- {
- rHooks = append(rHooks, Hooks[i-1])
- }
- logger.Debug("hooks: pre-job")
- err := runHooks(Hooks, func(h jobHook) error { return h.preJob() }, "pre-job")
- if err != nil {
- return err
- }
- for retry := 0; retry < provider.Retry(); retry++ {
- stopASAP := false // stop job as soon as possible
- if retry > 0 {
- logger.Noticef("retry syncing: %s, retry: %d", m.Name(), retry)
- }
- err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec")
- if err != nil {
- return err
- }
- // start syncing
- managerChan <- jobMessage{tunasync.Syncing, m.Name(), "", false}
- var syncErr error
- syncDone := make(chan error, 1)
- started := make(chan empty, 10) // we may receive "started" more than one time (e.g. two_stage_rsync)
- go func() {
- err := provider.Run(started)
- syncDone <- err
- }()
- select { // Wait until provider started or error happened
- case err := <-syncDone:
- logger.Errorf("failed to start provider %s: %s", m.Name(), err.Error())
- syncDone <- err // it will be read again later
- case <-started:
- logger.Debug("provider started")
- }
- // Now terminating the provider is feasible
- select {
- case syncErr = <-syncDone:
- logger.Debug("syncing done")
- case <-kill:
- logger.Debug("received kill")
- stopASAP = true
- err := provider.Terminate()
- if err != nil {
- logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error())
- return err
- }
- syncErr = errors.New("killed by manager")
- }
- // post-exec hooks
- herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
- if herr != nil {
- return herr
- }
- if syncErr == nil {
- // syncing success
- logger.Noticef("succeeded syncing %s", m.Name())
- // post-success hooks
- logger.Debug("post-success hooks")
- err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
- if err != nil {
- return err
- }
- } else {
- // syncing failed
- logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
- // post-fail hooks
- logger.Debug("post-fail hooks")
- err := runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
- if err != nil {
- return err
- }
- }
- if syncErr == nil {
- // syncing success
- m.size = provider.DataSize()
- managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)}
- return nil
- }
- // syncing failed
- managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == provider.Retry()-1) && (m.State() == stateReady)}
- // gracefully exit
- if stopASAP {
- logger.Debug("No retry, exit directly")
- return nil
- }
- // continue to next retry
- } // for retry
- return nil
- }
- runJob := func(kill <-chan empty, jobDone chan<- empty, bypassSemaphore <-chan empty) {
- select {
- case semaphore <- empty{}:
- defer func() { <-semaphore }()
- runJobWrapper(kill, jobDone)
- case <-bypassSemaphore:
- logger.Noticef("Concurrent limit ignored by %s", m.Name())
- runJobWrapper(kill, jobDone)
- case <-kill:
- jobDone <- empty{}
- return
- }
- }
- bypassSemaphore := make(chan empty, 1)
- for {
- if m.State() == stateReady {
- kill := make(chan empty)
- jobDone := make(chan empty)
- go runJob(kill, jobDone, bypassSemaphore)
- _wait_for_job:
- select {
- case <-jobDone:
- logger.Debug("job done")
- case ctrl := <-m.ctrlChan:
- switch ctrl {
- case jobStop:
- m.SetState(statePaused)
- close(kill)
- <-jobDone
- case jobDisable:
- m.SetState(stateDisabled)
- close(kill)
- <-jobDone
- return nil
- case jobRestart:
- m.SetState(stateReady)
- close(kill)
- <-jobDone
- time.Sleep(time.Second) // Restart may fail if the process was not exited yet
- continue
- case jobForceStart:
- select { //non-blocking
- default:
- case bypassSemaphore <- empty{}:
- }
- fallthrough
- case jobStart:
- m.SetState(stateReady)
- goto _wait_for_job
- case jobHalt:
- m.SetState(stateHalting)
- close(kill)
- <-jobDone
- return nil
- default:
- // TODO: implement this
- close(kill)
- return nil
- }
- }
- }
- ctrl := <-m.ctrlChan
- switch ctrl {
- case jobStop:
- m.SetState(statePaused)
- case jobDisable:
- m.SetState(stateDisabled)
- return nil
- case jobForceStart:
- select { //non-blocking
- default:
- case bypassSemaphore <- empty{}:
- }
- fallthrough
- case jobRestart:
- fallthrough
- case jobStart:
- m.SetState(stateReady)
- default:
- // TODO
- return nil
- }
- }
- }
|