rsync_provider.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package worker
  2. import (
  3. "errors"
  4. "io/ioutil"
  5. "strings"
  6. "time"
  7. "github.com/tuna/tunasync/internal"
  8. )
  9. type rsyncConfig struct {
  10. name string
  11. rsyncCmd string
  12. upstreamURL, username, password, excludeFile string
  13. workingDir, logDir, logFile string
  14. useIPv6, useIPv4 bool
  15. interval time.Duration
  16. retry int
  17. }
  18. // An RsyncProvider provides the implementation to rsync-based syncing jobs
  19. type rsyncProvider struct {
  20. baseProvider
  21. rsyncConfig
  22. options []string
  23. dataSize string
  24. }
  25. func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
  26. // TODO: check config options
  27. if !strings.HasSuffix(c.upstreamURL, "/") {
  28. return nil, errors.New("rsync upstream URL should ends with /")
  29. }
  30. if c.retry == 0 {
  31. c.retry = defaultMaxRetry
  32. }
  33. provider := &rsyncProvider{
  34. baseProvider: baseProvider{
  35. name: c.name,
  36. ctx: NewContext(),
  37. interval: c.interval,
  38. retry: c.retry,
  39. },
  40. rsyncConfig: c,
  41. }
  42. if c.rsyncCmd == "" {
  43. provider.rsyncCmd = "rsync"
  44. }
  45. options := []string{
  46. "-aHvh", "--no-o", "--no-g", "--stats",
  47. "--exclude", ".~tmp~/",
  48. "--delete", "--delete-after", "--delay-updates",
  49. "--safe-links", "--timeout=120", "--contimeout=120",
  50. }
  51. if c.useIPv6 {
  52. options = append(options, "-6")
  53. } else if c.useIPv4 {
  54. options = append(options, "-4")
  55. }
  56. if c.excludeFile != "" {
  57. options = append(options, "--exclude-from", c.excludeFile)
  58. }
  59. provider.options = options
  60. provider.ctx.Set(_WorkingDirKey, c.workingDir)
  61. provider.ctx.Set(_LogDirKey, c.logDir)
  62. provider.ctx.Set(_LogFileKey, c.logFile)
  63. return provider, nil
  64. }
  65. func (p *rsyncProvider) Type() providerEnum {
  66. return provRsync
  67. }
  68. func (p *rsyncProvider) Upstream() string {
  69. return p.upstreamURL
  70. }
  71. func (p *rsyncProvider) DataSize() string {
  72. return p.dataSize
  73. }
  74. func (p *rsyncProvider) Run() error {
  75. p.dataSize = ""
  76. if err := p.Start(); err != nil {
  77. return err
  78. }
  79. if err := p.Wait(); err != nil {
  80. return err
  81. }
  82. if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil {
  83. p.dataSize = internal.ExtractSizeFromRsyncLog(logContent)
  84. }
  85. return nil
  86. }
  87. func (p *rsyncProvider) Start() error {
  88. p.Lock()
  89. defer p.Unlock()
  90. if p.IsRunning() {
  91. return errors.New("provider is currently running")
  92. }
  93. env := map[string]string{}
  94. if p.username != "" {
  95. env["USER"] = p.username
  96. }
  97. if p.password != "" {
  98. env["RSYNC_PASSWORD"] = p.password
  99. }
  100. command := []string{p.rsyncCmd}
  101. command = append(command, p.options...)
  102. command = append(command, p.upstreamURL, p.WorkingDir())
  103. p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
  104. if err := p.prepareLogFile(false); err != nil {
  105. return err
  106. }
  107. if err := p.cmd.Start(); err != nil {
  108. return err
  109. }
  110. p.isRunning.Store(true)
  111. return nil
  112. }