rsync_provider.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package worker
  2. import (
  3. "errors"
  4. "strings"
  5. "time"
  6. "github.com/tuna/tunasync/internal"
  7. )
  8. type rsyncConfig struct {
  9. name string
  10. rsyncCmd string
  11. upstreamURL, username, password, excludeFile string
  12. extraOptions []string
  13. overriddenOptions []string
  14. rsyncEnv map[string]string
  15. workingDir, logDir, logFile string
  16. useIPv6, useIPv4 bool
  17. interval time.Duration
  18. retry int
  19. timeout time.Duration
  20. }
  21. // An RsyncProvider provides the implementation to rsync-based syncing jobs
  22. type rsyncProvider struct {
  23. baseProvider
  24. rsyncConfig
  25. options []string
  26. dataSize string
  27. }
  28. func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
  29. // TODO: check config options
  30. if !strings.HasSuffix(c.upstreamURL, "/") {
  31. return nil, errors.New("rsync upstream URL should ends with /")
  32. }
  33. if c.retry == 0 {
  34. c.retry = defaultMaxRetry
  35. }
  36. provider := &rsyncProvider{
  37. baseProvider: baseProvider{
  38. name: c.name,
  39. ctx: NewContext(),
  40. interval: c.interval,
  41. retry: c.retry,
  42. timeout: c.timeout,
  43. },
  44. rsyncConfig: c,
  45. }
  46. if c.rsyncCmd == "" {
  47. provider.rsyncCmd = "rsync"
  48. }
  49. if c.rsyncEnv == nil {
  50. provider.rsyncEnv = map[string]string{}
  51. }
  52. if c.username != "" {
  53. provider.rsyncEnv["USER"] = c.username
  54. }
  55. if c.password != "" {
  56. provider.rsyncEnv["RSYNC_PASSWORD"] = c.password
  57. }
  58. options := []string{
  59. "-aHvh", "--no-o", "--no-g", "--stats",
  60. "--exclude", ".~tmp~/",
  61. "--delete", "--delete-after", "--delay-updates",
  62. "--safe-links", "--timeout=120",
  63. }
  64. if c.overriddenOptions != nil {
  65. options = c.overriddenOptions
  66. }
  67. if c.useIPv6 {
  68. options = append(options, "-6")
  69. } else if c.useIPv4 {
  70. options = append(options, "-4")
  71. }
  72. if c.excludeFile != "" {
  73. options = append(options, "--exclude-from", c.excludeFile)
  74. }
  75. if c.extraOptions != nil {
  76. options = append(options, c.extraOptions...)
  77. }
  78. provider.options = options
  79. provider.ctx.Set(_WorkingDirKey, c.workingDir)
  80. provider.ctx.Set(_LogDirKey, c.logDir)
  81. provider.ctx.Set(_LogFileKey, c.logFile)
  82. return provider, nil
  83. }
  84. func (p *rsyncProvider) Type() providerEnum {
  85. return provRsync
  86. }
  87. func (p *rsyncProvider) Upstream() string {
  88. return p.upstreamURL
  89. }
  90. func (p *rsyncProvider) DataSize() string {
  91. return p.dataSize
  92. }
  93. func (p *rsyncProvider) Run(started chan empty) error {
  94. p.dataSize = ""
  95. defer p.closeLogFile()
  96. if err := p.Start(); err != nil {
  97. return err
  98. }
  99. started <- empty{}
  100. if err := p.Wait(); err != nil {
  101. code, msg := internal.TranslateRsyncErrorCode(err)
  102. if code != 0 {
  103. logger.Debug("Rsync exitcode %d (%s)", code, msg)
  104. if p.logFileFd != nil {
  105. p.logFileFd.WriteString(msg + "\n")
  106. }
  107. }
  108. return err
  109. }
  110. p.dataSize = internal.ExtractSizeFromRsyncLog(p.LogFile())
  111. return nil
  112. }
  113. func (p *rsyncProvider) Start() error {
  114. p.Lock()
  115. defer p.Unlock()
  116. if p.IsRunning() {
  117. return errors.New("provider is currently running")
  118. }
  119. command := []string{p.rsyncCmd}
  120. command = append(command, p.options...)
  121. command = append(command, p.upstreamURL, p.WorkingDir())
  122. p.cmd = newCmdJob(p, command, p.WorkingDir(), p.rsyncEnv)
  123. if err := p.prepareLogFile(false); err != nil {
  124. return err
  125. }
  126. if err := p.cmd.Start(); err != nil {
  127. return err
  128. }
  129. p.isRunning.Store(true)
  130. logger.Debugf("set isRunning to true: %s", p.Name())
  131. return nil
  132. }