cmd_provider.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. "regexp"
  6. "time"
  7. "github.com/anmitsu/go-shlex"
  8. "github.com/tuna/tunasync/internal"
  9. )
  10. type cmdConfig struct {
  11. name string
  12. upstreamURL, command string
  13. workingDir, logDir, logFile string
  14. interval time.Duration
  15. retry int
  16. timeout time.Duration
  17. env map[string]string
  18. failOnMatch string
  19. sizePattern string
  20. }
  21. type cmdProvider struct {
  22. baseProvider
  23. cmdConfig
  24. command []string
  25. dataSize string
  26. failOnMatch *regexp.Regexp
  27. sizePattern *regexp.Regexp
  28. }
  29. func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
  30. // TODO: check config options
  31. if c.retry == 0 {
  32. c.retry = defaultMaxRetry
  33. }
  34. provider := &cmdProvider{
  35. baseProvider: baseProvider{
  36. name: c.name,
  37. ctx: NewContext(),
  38. interval: c.interval,
  39. retry: c.retry,
  40. timeout: c.timeout,
  41. },
  42. cmdConfig: c,
  43. }
  44. provider.ctx.Set(_WorkingDirKey, c.workingDir)
  45. provider.ctx.Set(_LogDirKey, c.logDir)
  46. provider.ctx.Set(_LogFileKey, c.logFile)
  47. cmd, err := shlex.Split(c.command, true)
  48. if err != nil {
  49. return nil, err
  50. }
  51. provider.command = cmd
  52. if len(c.failOnMatch) > 0 {
  53. var err error
  54. failOnMatch, err := regexp.Compile(c.failOnMatch)
  55. if err != nil {
  56. return nil, errors.New("fail-on-match regexp error: " + err.Error())
  57. }
  58. provider.failOnMatch = failOnMatch
  59. }
  60. if len(c.sizePattern) > 0 {
  61. var err error
  62. sizePattern, err := regexp.Compile(c.sizePattern)
  63. if err != nil {
  64. return nil, errors.New("size-pattern regexp error: " + err.Error())
  65. }
  66. provider.sizePattern = sizePattern
  67. }
  68. return provider, nil
  69. }
  70. func (p *cmdProvider) Type() providerEnum {
  71. return provCommand
  72. }
  73. func (p *cmdProvider) Upstream() string {
  74. return p.upstreamURL
  75. }
  76. func (p *cmdProvider) DataSize() string {
  77. return p.dataSize
  78. }
  79. func (p *cmdProvider) Run(started chan empty) error {
  80. p.dataSize = ""
  81. defer p.closeLogFile()
  82. if err := p.Start(); err != nil {
  83. return err
  84. }
  85. started <- empty{}
  86. if err := p.Wait(); err != nil {
  87. return err
  88. }
  89. if p.failOnMatch != nil {
  90. matches, err := internal.FindAllSubmatchInFile(p.LogFile(), p.failOnMatch)
  91. logger.Infof("FindAllSubmatchInFile: %q\n", matches)
  92. if err != nil {
  93. return err
  94. }
  95. if len(matches) != 0 {
  96. logger.Debug("Fail-on-match: %r", matches)
  97. return fmt.Errorf("Fail-on-match regexp found %d matches", len(matches))
  98. }
  99. }
  100. if p.sizePattern != nil {
  101. p.dataSize = internal.ExtractSizeFromLog(p.LogFile(), p.sizePattern)
  102. }
  103. return nil
  104. }
  105. func (p *cmdProvider) Start() error {
  106. p.Lock()
  107. defer p.Unlock()
  108. if p.IsRunning() {
  109. return errors.New("provider is currently running")
  110. }
  111. env := map[string]string{
  112. "TUNASYNC_MIRROR_NAME": p.Name(),
  113. "TUNASYNC_WORKING_DIR": p.WorkingDir(),
  114. "TUNASYNC_UPSTREAM_URL": p.upstreamURL,
  115. "TUNASYNC_LOG_DIR": p.LogDir(),
  116. "TUNASYNC_LOG_FILE": p.LogFile(),
  117. }
  118. for k, v := range p.env {
  119. env[k] = v
  120. }
  121. p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env)
  122. if err := p.prepareLogFile(false); err != nil {
  123. return err
  124. }
  125. if err := p.cmd.Start(); err != nil {
  126. return err
  127. }
  128. p.isRunning.Store(true)
  129. logger.Debugf("set isRunning to true: %s", p.Name())
  130. return nil
  131. }