2
0

base_provider.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package worker
  2. import (
  3. "os"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. // baseProvider is the base mixin of providers
  9. type baseProvider struct {
  10. sync.Mutex
  11. ctx *Context
  12. name string
  13. interval time.Duration
  14. retry int
  15. timeout time.Duration
  16. isMaster bool
  17. cmd *cmdJob
  18. logFileFd *os.File
  19. isRunning atomic.Value
  20. successExitCodes []int
  21. cgroup *cgroupHook
  22. zfs *zfsHook
  23. docker *dockerHook
  24. hooks []jobHook
  25. }
  26. func (p *baseProvider) Name() string {
  27. return p.name
  28. }
  29. func (p *baseProvider) EnterContext() *Context {
  30. p.ctx = p.ctx.Enter()
  31. return p.ctx
  32. }
  33. func (p *baseProvider) ExitContext() *Context {
  34. p.ctx, _ = p.ctx.Exit()
  35. return p.ctx
  36. }
  37. func (p *baseProvider) Context() *Context {
  38. return p.ctx
  39. }
  40. func (p *baseProvider) Interval() time.Duration {
  41. // logger.Debug("interval for %s: %v", p.Name(), p.interval)
  42. return p.interval
  43. }
  44. func (p *baseProvider) Retry() int {
  45. return p.retry
  46. }
  47. func (p *baseProvider) Timeout() time.Duration {
  48. return p.timeout
  49. }
  50. func (p *baseProvider) IsMaster() bool {
  51. return p.isMaster
  52. }
  53. func (p *baseProvider) WorkingDir() string {
  54. if v, ok := p.ctx.Get(_WorkingDirKey); ok {
  55. if s, ok := v.(string); ok {
  56. return s
  57. }
  58. }
  59. panic("working dir is impossible to be non-exist")
  60. }
  61. func (p *baseProvider) LogDir() string {
  62. if v, ok := p.ctx.Get(_LogDirKey); ok {
  63. if s, ok := v.(string); ok {
  64. return s
  65. }
  66. }
  67. panic("log dir is impossible to be unavailable")
  68. }
  69. func (p *baseProvider) LogFile() string {
  70. if v, ok := p.ctx.Get(_LogFileKey); ok {
  71. if s, ok := v.(string); ok {
  72. return s
  73. }
  74. }
  75. panic("log file is impossible to be unavailable")
  76. }
  77. func (p *baseProvider) AddHook(hook jobHook) {
  78. switch v := hook.(type) {
  79. case *cgroupHook:
  80. p.cgroup = v
  81. case *zfsHook:
  82. p.zfs = v
  83. case *dockerHook:
  84. p.docker = v
  85. }
  86. p.hooks = append(p.hooks, hook)
  87. }
  88. func (p *baseProvider) Hooks() []jobHook {
  89. return p.hooks
  90. }
  91. func (p *baseProvider) Cgroup() *cgroupHook {
  92. return p.cgroup
  93. }
  94. func (p *baseProvider) ZFS() *zfsHook {
  95. return p.zfs
  96. }
  97. func (p *baseProvider) Docker() *dockerHook {
  98. return p.docker
  99. }
  100. func (p *baseProvider) prepareLogFile(append bool) error {
  101. if p.LogFile() == "/dev/null" {
  102. p.cmd.SetLogFile(nil)
  103. return nil
  104. }
  105. appendMode := 0
  106. if append {
  107. appendMode = os.O_APPEND
  108. }
  109. logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE|appendMode, 0644)
  110. if err != nil {
  111. logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
  112. return err
  113. }
  114. p.logFileFd = logFile
  115. p.cmd.SetLogFile(logFile)
  116. return nil
  117. }
  118. func (p *baseProvider) closeLogFile() (err error) {
  119. if p.logFileFd != nil {
  120. err = p.logFileFd.Close()
  121. p.logFileFd = nil
  122. }
  123. return
  124. }
  125. func (p *baseProvider) Run(started chan empty) error {
  126. panic("Not Implemented")
  127. }
  128. func (p *baseProvider) Start() error {
  129. panic("Not Implemented")
  130. }
  131. func (p *baseProvider) IsRunning() bool {
  132. isRunning, _ := p.isRunning.Load().(bool)
  133. return isRunning
  134. }
  135. func (p *baseProvider) Wait() error {
  136. defer func() {
  137. logger.Debugf("set isRunning to false: %s", p.Name())
  138. p.isRunning.Store(false)
  139. }()
  140. logger.Debugf("calling Wait: %s", p.Name())
  141. return p.cmd.Wait()
  142. }
  143. func (p *baseProvider) Terminate() error {
  144. p.Lock()
  145. defer p.Unlock()
  146. logger.Debugf("terminating provider: %s", p.Name())
  147. if !p.IsRunning() {
  148. logger.Warningf("Terminate() called while IsRunning is false: %s", p.Name())
  149. return nil
  150. }
  151. err := p.cmd.Terminate()
  152. return err
  153. }
  154. func (p *baseProvider) DataSize() string {
  155. return ""
  156. }
  157. func (p *baseProvider) SetSuccessExitCodes(codes []int) {
  158. if codes == nil {
  159. p.successExitCodes = []int{}
  160. } else {
  161. p.successExitCodes = codes
  162. }
  163. }
  164. func (p *baseProvider) GetSuccessExitCodes() []int {
  165. if p.successExitCodes == nil {
  166. return []int{}
  167. }
  168. return p.successExitCodes
  169. }