rsync_provider.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package worker
  2. import (
  3. "errors"
  4. "io/ioutil"
  5. "strings"
  6. "time"
  7. "github.com/tuna/tunasync/internal"
  8. )
  9. type rsyncConfig struct {
  10. name string
  11. rsyncCmd string
  12. upstreamURL, username, password, excludeFile string
  13. extraOptions []string
  14. workingDir, logDir, logFile string
  15. useIPv6, useIPv4 bool
  16. interval time.Duration
  17. retry int
  18. }
  19. // An RsyncProvider provides the implementation to rsync-based syncing jobs
  20. type rsyncProvider struct {
  21. baseProvider
  22. rsyncConfig
  23. options []string
  24. dataSize string
  25. }
  26. func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
  27. // TODO: check config options
  28. if !strings.HasSuffix(c.upstreamURL, "/") {
  29. return nil, errors.New("rsync upstream URL should ends with /")
  30. }
  31. if c.retry == 0 {
  32. c.retry = defaultMaxRetry
  33. }
  34. provider := &rsyncProvider{
  35. baseProvider: baseProvider{
  36. name: c.name,
  37. ctx: NewContext(),
  38. interval: c.interval,
  39. retry: c.retry,
  40. },
  41. rsyncConfig: c,
  42. }
  43. if c.rsyncCmd == "" {
  44. provider.rsyncCmd = "rsync"
  45. }
  46. options := []string{
  47. "-aHvh", "--no-o", "--no-g", "--stats",
  48. "--exclude", ".~tmp~/",
  49. "--delete", "--delete-after", "--delay-updates",
  50. "--safe-links", "--timeout=120", "--contimeout=120",
  51. }
  52. if c.useIPv6 {
  53. options = append(options, "-6")
  54. } else if c.useIPv4 {
  55. options = append(options, "-4")
  56. }
  57. if c.excludeFile != "" {
  58. options = append(options, "--exclude-from", c.excludeFile)
  59. }
  60. if c.extraOptions != nil {
  61. options = append(options, c.extraOptions...)
  62. }
  63. provider.options = options
  64. provider.ctx.Set(_WorkingDirKey, c.workingDir)
  65. provider.ctx.Set(_LogDirKey, c.logDir)
  66. provider.ctx.Set(_LogFileKey, c.logFile)
  67. return provider, nil
  68. }
  69. func (p *rsyncProvider) Type() providerEnum {
  70. return provRsync
  71. }
  72. func (p *rsyncProvider) Upstream() string {
  73. return p.upstreamURL
  74. }
  75. func (p *rsyncProvider) DataSize() string {
  76. return p.dataSize
  77. }
  78. func (p *rsyncProvider) Run() error {
  79. p.dataSize = ""
  80. if err := p.Start(); err != nil {
  81. return err
  82. }
  83. if err := p.Wait(); err != nil {
  84. return err
  85. }
  86. if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil {
  87. p.dataSize = internal.ExtractSizeFromRsyncLog(logContent)
  88. }
  89. return nil
  90. }
  91. func (p *rsyncProvider) Start() error {
  92. p.Lock()
  93. defer p.Unlock()
  94. if p.IsRunning() {
  95. return errors.New("provider is currently running")
  96. }
  97. env := map[string]string{}
  98. if p.username != "" {
  99. env["USER"] = p.username
  100. }
  101. if p.password != "" {
  102. env["RSYNC_PASSWORD"] = p.password
  103. }
  104. command := []string{p.rsyncCmd}
  105. command = append(command, p.options...)
  106. command = append(command, p.upstreamURL, p.WorkingDir())
  107. p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
  108. if err := p.prepareLogFile(false); err != nil {
  109. return err
  110. }
  111. if err := p.cmd.Start(); err != nil {
  112. return err
  113. }
  114. p.isRunning.Store(true)
  115. return nil
  116. }