rsync_provider.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/tuna/tunasync/internal"
  8. )
  9. type rsyncConfig struct {
  10. name string
  11. rsyncCmd string
  12. upstreamURL, username, password, excludeFile string
  13. extraOptions []string
  14. overriddenOptions []string
  15. useOverrideOnly bool
  16. rsyncNeverTimeout bool
  17. rsyncTimeoutValue int
  18. rsyncEnv map[string]string
  19. workingDir, logDir, logFile string
  20. useIPv6, useIPv4 bool
  21. interval time.Duration
  22. retry int
  23. timeout time.Duration
  24. }
  25. // An RsyncProvider provides the implementation to rsync-based syncing jobs
  26. type rsyncProvider struct {
  27. baseProvider
  28. rsyncConfig
  29. options []string
  30. dataSize string
  31. }
  32. func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
  33. // TODO: check config options
  34. if !strings.HasSuffix(c.upstreamURL, "/") {
  35. return nil, errors.New("rsync upstream URL should ends with /")
  36. }
  37. if c.retry == 0 {
  38. c.retry = defaultMaxRetry
  39. }
  40. provider := &rsyncProvider{
  41. baseProvider: baseProvider{
  42. name: c.name,
  43. ctx: NewContext(),
  44. interval: c.interval,
  45. retry: c.retry,
  46. timeout: c.timeout,
  47. },
  48. rsyncConfig: c,
  49. }
  50. if c.rsyncCmd == "" {
  51. provider.rsyncCmd = "rsync"
  52. }
  53. if c.rsyncEnv == nil {
  54. provider.rsyncEnv = map[string]string{}
  55. }
  56. if c.username != "" {
  57. provider.rsyncEnv["USER"] = c.username
  58. }
  59. if c.password != "" {
  60. provider.rsyncEnv["RSYNC_PASSWORD"] = c.password
  61. }
  62. options := []string{
  63. "-aHvh", "--no-o", "--no-g", "--stats",
  64. "--filter", "risk .~tmp~/", "--exclude", ".~tmp~/",
  65. "--delete", "--delete-after", "--delay-updates",
  66. "--safe-links",
  67. }
  68. if c.overriddenOptions != nil {
  69. options = c.overriddenOptions
  70. }
  71. if c.useOverrideOnly == true {
  72. if c.overriddenOptions == nil {
  73. return nil, errors.New("rsync_override_only is set but no rsync_override provided")
  74. }
  75. } else {
  76. if !c.rsyncNeverTimeout {
  77. timeo := 120
  78. if c.rsyncTimeoutValue > 0 {
  79. timeo = c.rsyncTimeoutValue
  80. }
  81. options = append(options, fmt.Sprintf("--timeout=%d", timeo))
  82. }
  83. if c.useIPv6 {
  84. options = append(options, "-6")
  85. } else if c.useIPv4 {
  86. options = append(options, "-4")
  87. }
  88. if c.excludeFile != "" {
  89. options = append(options, "--exclude-from", c.excludeFile)
  90. }
  91. if c.extraOptions != nil {
  92. options = append(options, c.extraOptions...)
  93. }
  94. }
  95. provider.options = options
  96. provider.ctx.Set(_WorkingDirKey, c.workingDir)
  97. provider.ctx.Set(_LogDirKey, c.logDir)
  98. provider.ctx.Set(_LogFileKey, c.logFile)
  99. return provider, nil
  100. }
  101. func (p *rsyncProvider) Type() providerEnum {
  102. return provRsync
  103. }
  104. func (p *rsyncProvider) Upstream() string {
  105. return p.upstreamURL
  106. }
  107. func (p *rsyncProvider) DataSize() string {
  108. return p.dataSize
  109. }
  110. func (p *rsyncProvider) Run(started chan empty) error {
  111. p.dataSize = ""
  112. defer p.closeLogFile()
  113. if err := p.Start(); err != nil {
  114. return err
  115. }
  116. started <- empty{}
  117. if err := p.Wait(); err != nil {
  118. code, msg := internal.TranslateRsyncErrorCode(err)
  119. if code != 0 {
  120. logger.Debug("Rsync exitcode %d (%s)", code, msg)
  121. if p.logFileFd != nil {
  122. p.logFileFd.WriteString(msg + "\n")
  123. }
  124. }
  125. return err
  126. }
  127. p.dataSize = internal.ExtractSizeFromRsyncLog(p.LogFile())
  128. return nil
  129. }
  130. func (p *rsyncProvider) Start() error {
  131. p.Lock()
  132. defer p.Unlock()
  133. if p.IsRunning() {
  134. return errors.New("provider is currently running")
  135. }
  136. command := []string{p.rsyncCmd}
  137. command = append(command, p.options...)
  138. command = append(command, p.upstreamURL, p.WorkingDir())
  139. p.cmd = newCmdJob(p, command, p.WorkingDir(), p.rsyncEnv)
  140. if err := p.prepareLogFile(false); err != nil {
  141. return err
  142. }
  143. if err := p.cmd.Start(); err != nil {
  144. return err
  145. }
  146. p.isRunning.Store(true)
  147. logger.Debugf("set isRunning to true: %s", p.Name())
  148. return nil
  149. }