provider.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package worker
  2. import (
  3. "os"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. // mirror provider is the wrapper of mirror jobs
  9. type providerType uint8
  10. const (
  11. _WorkingDirKey = "working_dir"
  12. _LogDirKey = "log_dir"
  13. _LogFileKey = "log_file"
  14. )
  15. // A mirrorProvider instance
  16. type mirrorProvider interface {
  17. // name
  18. Name() string
  19. // run mirror job in background
  20. Run() error
  21. // run mirror job in background
  22. Start() error
  23. // Wait job to finish
  24. Wait() error
  25. // terminate mirror job
  26. Terminate() error
  27. // job hooks
  28. IsRunning() bool
  29. AddHook(hook jobHook)
  30. Hooks() []jobHook
  31. Interval() time.Duration
  32. WorkingDir() string
  33. LogDir() string
  34. LogFile() string
  35. // enter context
  36. EnterContext() *Context
  37. // exit context
  38. ExitContext() *Context
  39. // return context
  40. Context() *Context
  41. }
  42. type baseProvider struct {
  43. sync.Mutex
  44. ctx *Context
  45. name string
  46. interval time.Duration
  47. cmd *cmdJob
  48. isRunning atomic.Value
  49. logFile *os.File
  50. hooks []jobHook
  51. }
  52. func (p *baseProvider) Name() string {
  53. return p.name
  54. }
  55. func (p *baseProvider) EnterContext() *Context {
  56. p.ctx = p.ctx.Enter()
  57. return p.ctx
  58. }
  59. func (p *baseProvider) ExitContext() *Context {
  60. p.ctx, _ = p.ctx.Exit()
  61. return p.ctx
  62. }
  63. func (p *baseProvider) Context() *Context {
  64. return p.ctx
  65. }
  66. func (p *baseProvider) Interval() time.Duration {
  67. return p.interval
  68. }
  69. func (p *baseProvider) WorkingDir() string {
  70. if v, ok := p.ctx.Get(_WorkingDirKey); ok {
  71. if s, ok := v.(string); ok {
  72. return s
  73. }
  74. }
  75. panic("working dir is impossible to be non-exist")
  76. }
  77. func (p *baseProvider) LogDir() string {
  78. if v, ok := p.ctx.Get(_LogDirKey); ok {
  79. if s, ok := v.(string); ok {
  80. return s
  81. }
  82. }
  83. panic("log dir is impossible to be unavailable")
  84. }
  85. func (p *baseProvider) LogFile() string {
  86. if v, ok := p.ctx.Get(_LogFileKey); ok {
  87. if s, ok := v.(string); ok {
  88. return s
  89. }
  90. }
  91. panic("log dir is impossible to be unavailable")
  92. }
  93. func (p *baseProvider) AddHook(hook jobHook) {
  94. p.hooks = append(p.hooks, hook)
  95. }
  96. func (p *baseProvider) Hooks() []jobHook {
  97. return p.hooks
  98. }
  99. func (p *baseProvider) setLogFile() error {
  100. if p.LogFile() == "/dev/null" {
  101. p.cmd.SetLogFile(nil)
  102. return nil
  103. }
  104. if p.logFile == nil {
  105. logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
  106. if err != nil {
  107. logger.Error("Error opening logfile %s: %s", p.LogFile(), err.Error())
  108. return err
  109. }
  110. p.logFile = logFile
  111. }
  112. p.cmd.SetLogFile(p.logFile)
  113. return nil
  114. }
  115. func (p *baseProvider) Run() error {
  116. panic("Not Implemented")
  117. }
  118. func (p *baseProvider) Start() error {
  119. panic("Not Implemented")
  120. }
  121. func (p *baseProvider) IsRunning() bool {
  122. isRunning, _ := p.isRunning.Load().(bool)
  123. return isRunning
  124. }
  125. func (p *baseProvider) Wait() error {
  126. defer func() {
  127. p.Lock()
  128. p.isRunning.Store(false)
  129. if p.logFile != nil {
  130. p.logFile.Close()
  131. p.logFile = nil
  132. }
  133. p.Unlock()
  134. }()
  135. return p.cmd.Wait()
  136. }
  137. func (p *baseProvider) Terminate() error {
  138. logger.Debug("terminating provider: %s", p.Name())
  139. if !p.IsRunning() {
  140. return nil
  141. }
  142. p.Lock()
  143. if p.logFile != nil {
  144. p.logFile.Close()
  145. p.logFile = nil
  146. }
  147. p.Unlock()
  148. err := p.cmd.Terminate()
  149. p.isRunning.Store(false)
  150. return err
  151. }