2
0

rsync_provider.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/tuna/tunasync/internal"
  8. )
  9. type rsyncConfig struct {
  10. name string
  11. rsyncCmd string
  12. upstreamURL, username, password, excludeFile string
  13. extraOptions []string
  14. globalOptions []string
  15. overriddenOptions []string
  16. useOverrideOnly bool
  17. rsyncNeverTimeout bool
  18. rsyncTimeoutValue int
  19. rsyncEnv map[string]string
  20. workingDir, logDir, logFile string
  21. useIPv6, useIPv4 bool
  22. interval time.Duration
  23. retry int
  24. timeout time.Duration
  25. }
  26. // An RsyncProvider provides the implementation to rsync-based syncing jobs
  27. type rsyncProvider struct {
  28. baseProvider
  29. rsyncConfig
  30. options []string
  31. dataSize string
  32. }
  33. func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
  34. // TODO: check config options
  35. if !strings.HasSuffix(c.upstreamURL, "/") {
  36. return nil, errors.New("rsync upstream URL should ends with /")
  37. }
  38. if c.retry == 0 {
  39. c.retry = defaultMaxRetry
  40. }
  41. provider := &rsyncProvider{
  42. baseProvider: baseProvider{
  43. name: c.name,
  44. ctx: NewContext(),
  45. interval: c.interval,
  46. retry: c.retry,
  47. timeout: c.timeout,
  48. },
  49. rsyncConfig: c,
  50. }
  51. if c.rsyncCmd == "" {
  52. provider.rsyncCmd = "rsync"
  53. }
  54. if c.rsyncEnv == nil {
  55. provider.rsyncEnv = map[string]string{}
  56. }
  57. if c.username != "" {
  58. provider.rsyncEnv["USER"] = c.username
  59. }
  60. if c.password != "" {
  61. provider.rsyncEnv["RSYNC_PASSWORD"] = c.password
  62. }
  63. options := []string{
  64. "-aHvh", "--no-o", "--no-g", "--stats",
  65. "--filter", "risk .~tmp~/", "--exclude", ".~tmp~/",
  66. "--delete", "--delete-after", "--delay-updates",
  67. "--safe-links",
  68. }
  69. if c.overriddenOptions != nil {
  70. options = c.overriddenOptions
  71. }
  72. if c.useOverrideOnly == true {
  73. if c.overriddenOptions == nil {
  74. return nil, errors.New("rsync_override_only is set but no rsync_override provided")
  75. }
  76. // use overridden options only
  77. } else {
  78. if !c.rsyncNeverTimeout {
  79. timeo := 120
  80. if c.rsyncTimeoutValue > 0 {
  81. timeo = c.rsyncTimeoutValue
  82. }
  83. options = append(options, fmt.Sprintf("--timeout=%d", timeo))
  84. }
  85. if c.useIPv6 {
  86. options = append(options, "-6")
  87. } else if c.useIPv4 {
  88. options = append(options, "-4")
  89. }
  90. if c.excludeFile != "" {
  91. options = append(options, "--exclude-from", c.excludeFile)
  92. }
  93. if c.globalOptions != nil {
  94. options = append(options, c.globalOptions...)
  95. }
  96. if c.extraOptions != nil {
  97. options = append(options, c.extraOptions...)
  98. }
  99. }
  100. provider.options = options
  101. provider.ctx.Set(_WorkingDirKey, c.workingDir)
  102. provider.ctx.Set(_LogDirKey, c.logDir)
  103. provider.ctx.Set(_LogFileKey, c.logFile)
  104. return provider, nil
  105. }
  106. func (p *rsyncProvider) Type() providerEnum {
  107. return provRsync
  108. }
  109. func (p *rsyncProvider) Upstream() string {
  110. return p.upstreamURL
  111. }
  112. func (p *rsyncProvider) DataSize() string {
  113. return p.dataSize
  114. }
  115. func (p *rsyncProvider) Run(started chan empty) error {
  116. p.dataSize = ""
  117. defer p.closeLogFile()
  118. if err := p.Start(); err != nil {
  119. return err
  120. }
  121. started <- empty{}
  122. if err := p.Wait(); err != nil {
  123. code, msg := internal.TranslateRsyncErrorCode(err)
  124. if code != 0 {
  125. logger.Debug("Rsync exitcode %d (%s)", code, msg)
  126. if p.logFileFd != nil {
  127. p.logFileFd.WriteString(msg + "\n")
  128. }
  129. }
  130. return err
  131. }
  132. p.dataSize = internal.ExtractSizeFromRsyncLog(p.LogFile())
  133. return nil
  134. }
  135. func (p *rsyncProvider) Start() error {
  136. p.Lock()
  137. defer p.Unlock()
  138. if p.IsRunning() {
  139. return errors.New("provider is currently running")
  140. }
  141. command := []string{p.rsyncCmd}
  142. command = append(command, p.options...)
  143. command = append(command, p.upstreamURL, p.WorkingDir())
  144. p.cmd = newCmdJob(p, command, p.WorkingDir(), p.rsyncEnv)
  145. if err := p.prepareLogFile(false); err != nil {
  146. return err
  147. }
  148. if err := p.cmd.Start(); err != nil {
  149. return err
  150. }
  151. p.isRunning.Store(true)
  152. logger.Debugf("set isRunning to true: %s", p.Name())
  153. return nil
  154. }