provider.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package worker
  2. import (
  3. "errors"
  4. "os"
  5. "time"
  6. )
  7. // mirror provider is the wrapper of mirror jobs
  8. type providerType uint8
  9. const (
  10. _WorkingDirKey = "working_dir"
  11. _LogDirKey = "log_dir"
  12. _LogFileKey = "log_file"
  13. )
  14. // A mirrorProvider instance
  15. type mirrorProvider interface {
  16. // name
  17. Name() string
  18. // TODO: implement Run, Terminate and Hooks
  19. // run mirror job in background
  20. Start() error
  21. // Wait job to finish
  22. Wait() error
  23. // terminate mirror job
  24. Terminate() error
  25. // job hooks
  26. Hooks() []jobHook
  27. Interval() time.Duration
  28. WorkingDir() string
  29. LogDir() string
  30. LogFile() string
  31. // enter context
  32. EnterContext() *Context
  33. // exit context
  34. ExitContext() *Context
  35. // return context
  36. Context() *Context
  37. }
  38. type baseProvider struct {
  39. ctx *Context
  40. name string
  41. interval time.Duration
  42. cmd *cmdJob
  43. logFile *os.File
  44. hooks []jobHook
  45. }
  46. func (p *baseProvider) Name() string {
  47. return p.name
  48. }
  49. func (p *baseProvider) EnterContext() *Context {
  50. p.ctx = p.ctx.Enter()
  51. return p.ctx
  52. }
  53. func (p *baseProvider) ExitContext() *Context {
  54. p.ctx, _ = p.ctx.Exit()
  55. return p.ctx
  56. }
  57. func (p *baseProvider) Context() *Context {
  58. return p.ctx
  59. }
  60. func (p *baseProvider) Interval() time.Duration {
  61. return p.interval
  62. }
  63. func (p *baseProvider) WorkingDir() string {
  64. if v, ok := p.ctx.Get(_WorkingDirKey); ok {
  65. if s, ok := v.(string); ok {
  66. return s
  67. }
  68. }
  69. panic("working dir is impossible to be non-exist")
  70. }
  71. func (p *baseProvider) LogDir() string {
  72. if v, ok := p.ctx.Get(_LogDirKey); ok {
  73. if s, ok := v.(string); ok {
  74. return s
  75. }
  76. }
  77. panic("log dir is impossible to be unavailable")
  78. }
  79. func (p *baseProvider) LogFile() string {
  80. if v, ok := p.ctx.Get(_LogFileKey); ok {
  81. if s, ok := v.(string); ok {
  82. return s
  83. }
  84. }
  85. panic("log dir is impossible to be unavailable")
  86. }
  87. func (p *baseProvider) AddHook(hook jobHook) {
  88. p.hooks = append(p.hooks, hook)
  89. }
  90. func (p *baseProvider) Hooks() []jobHook {
  91. return p.hooks
  92. }
  93. func (p *baseProvider) setLogFile() error {
  94. if p.LogFile() == "/dev/null" {
  95. p.cmd.SetLogFile(nil)
  96. return nil
  97. }
  98. logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
  99. if err != nil {
  100. logger.Error("Error opening logfile %s: %s", p.LogFile(), err.Error())
  101. return err
  102. }
  103. p.logFile = logFile
  104. p.cmd.SetLogFile(logFile)
  105. return nil
  106. }
  107. func (p *baseProvider) Wait() error {
  108. if p.logFile != nil {
  109. defer p.logFile.Close()
  110. }
  111. return p.cmd.Wait()
  112. }
  113. func (p *baseProvider) Terminate() error {
  114. logger.Debug("terminating provider: %s", p.Name())
  115. if p.cmd == nil {
  116. return errors.New("provider command job not initialized")
  117. }
  118. if p.logFile != nil {
  119. p.logFile.Close()
  120. }
  121. err := p.cmd.Terminate()
  122. return err
  123. }