rsync_provider.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package worker
  2. import (
  3. "errors"
  4. "io/ioutil"
  5. "strings"
  6. "time"
  7. "github.com/tuna/tunasync/internal"
  8. )
  9. type rsyncConfig struct {
  10. name string
  11. rsyncCmd string
  12. upstreamURL, username, password, excludeFile string
  13. extraOptions []string
  14. overriddenOptions []string
  15. workingDir, logDir, logFile string
  16. useIPv6, useIPv4 bool
  17. interval time.Duration
  18. retry int
  19. }
  20. // An RsyncProvider provides the implementation to rsync-based syncing jobs
  21. type rsyncProvider struct {
  22. baseProvider
  23. rsyncConfig
  24. options []string
  25. dataSize string
  26. }
  27. func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
  28. // TODO: check config options
  29. if !strings.HasSuffix(c.upstreamURL, "/") {
  30. return nil, errors.New("rsync upstream URL should ends with /")
  31. }
  32. if c.retry == 0 {
  33. c.retry = defaultMaxRetry
  34. }
  35. provider := &rsyncProvider{
  36. baseProvider: baseProvider{
  37. name: c.name,
  38. ctx: NewContext(),
  39. interval: c.interval,
  40. retry: c.retry,
  41. },
  42. rsyncConfig: c,
  43. }
  44. if c.rsyncCmd == "" {
  45. provider.rsyncCmd = "rsync"
  46. }
  47. options := []string{
  48. "-aHvh", "--no-o", "--no-g", "--stats",
  49. "--exclude", ".~tmp~/",
  50. "--delete", "--delete-after", "--delay-updates",
  51. "--safe-links", "--timeout=120", "--contimeout=120",
  52. }
  53. if overriddenOptions != nil {
  54. options = overriddenOptions
  55. }
  56. if c.useIPv6 {
  57. options = append(options, "-6")
  58. } else if c.useIPv4 {
  59. options = append(options, "-4")
  60. }
  61. if c.excludeFile != "" {
  62. options = append(options, "--exclude-from", c.excludeFile)
  63. }
  64. if c.extraOptions != nil {
  65. options = append(options, c.extraOptions...)
  66. }
  67. provider.options = options
  68. provider.ctx.Set(_WorkingDirKey, c.workingDir)
  69. provider.ctx.Set(_LogDirKey, c.logDir)
  70. provider.ctx.Set(_LogFileKey, c.logFile)
  71. return provider, nil
  72. }
  73. func (p *rsyncProvider) Type() providerEnum {
  74. return provRsync
  75. }
  76. func (p *rsyncProvider) Upstream() string {
  77. return p.upstreamURL
  78. }
  79. func (p *rsyncProvider) DataSize() string {
  80. return p.dataSize
  81. }
  82. func (p *rsyncProvider) Run() error {
  83. p.dataSize = ""
  84. if err := p.Start(); err != nil {
  85. return err
  86. }
  87. if err := p.Wait(); err != nil {
  88. return err
  89. }
  90. if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil {
  91. p.dataSize = internal.ExtractSizeFromRsyncLog(logContent)
  92. }
  93. return nil
  94. }
  95. func (p *rsyncProvider) Start() error {
  96. p.Lock()
  97. defer p.Unlock()
  98. if p.IsRunning() {
  99. return errors.New("provider is currently running")
  100. }
  101. env := map[string]string{}
  102. if p.username != "" {
  103. env["USER"] = p.username
  104. }
  105. if p.password != "" {
  106. env["RSYNC_PASSWORD"] = p.password
  107. }
  108. command := []string{p.rsyncCmd}
  109. command = append(command, p.options...)
  110. command = append(command, p.upstreamURL, p.WorkingDir())
  111. p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
  112. if err := p.prepareLogFile(false); err != nil {
  113. return err
  114. }
  115. if err := p.cmd.Start(); err != nil {
  116. return err
  117. }
  118. p.isRunning.Store(true)
  119. return nil
  120. }