rsync_provider.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package worker
  2. import (
  3. "errors"
  4. "strings"
  5. "time"
  6. "github.com/tuna/tunasync/internal"
  7. )
  8. type rsyncConfig struct {
  9. name string
  10. rsyncCmd string
  11. upstreamURL, username, password, excludeFile string
  12. extraOptions []string
  13. overriddenOptions []string
  14. rsyncEnv map[string]string
  15. workingDir, logDir, logFile string
  16. useIPv6, useIPv4 bool
  17. interval time.Duration
  18. retry int
  19. }
  20. // An RsyncProvider provides the implementation to rsync-based syncing jobs
  21. type rsyncProvider struct {
  22. baseProvider
  23. rsyncConfig
  24. options []string
  25. dataSize string
  26. }
  27. func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
  28. // TODO: check config options
  29. if !strings.HasSuffix(c.upstreamURL, "/") {
  30. return nil, errors.New("rsync upstream URL should ends with /")
  31. }
  32. if c.retry == 0 {
  33. c.retry = defaultMaxRetry
  34. }
  35. provider := &rsyncProvider{
  36. baseProvider: baseProvider{
  37. name: c.name,
  38. ctx: NewContext(),
  39. interval: c.interval,
  40. retry: c.retry,
  41. },
  42. rsyncConfig: c,
  43. }
  44. if c.rsyncCmd == "" {
  45. provider.rsyncCmd = "rsync"
  46. }
  47. if c.rsyncEnv == nil {
  48. provider.rsyncEnv = map[string]string{}
  49. }
  50. if c.username != "" {
  51. provider.rsyncEnv["USER"] = c.username
  52. }
  53. if c.password != "" {
  54. provider.rsyncEnv["RSYNC_PASSWORD"] = c.password
  55. }
  56. options := []string{
  57. "-aHvh", "--no-o", "--no-g", "--stats",
  58. "--exclude", ".~tmp~/",
  59. "--delete", "--delete-after", "--delay-updates",
  60. "--safe-links", "--timeout=120",
  61. }
  62. if c.overriddenOptions != nil {
  63. options = c.overriddenOptions
  64. }
  65. if c.useIPv6 {
  66. options = append(options, "-6")
  67. } else if c.useIPv4 {
  68. options = append(options, "-4")
  69. }
  70. if c.excludeFile != "" {
  71. options = append(options, "--exclude-from", c.excludeFile)
  72. }
  73. if c.extraOptions != nil {
  74. options = append(options, c.extraOptions...)
  75. }
  76. provider.options = options
  77. provider.ctx.Set(_WorkingDirKey, c.workingDir)
  78. provider.ctx.Set(_LogDirKey, c.logDir)
  79. provider.ctx.Set(_LogFileKey, c.logFile)
  80. return provider, nil
  81. }
  82. func (p *rsyncProvider) Type() providerEnum {
  83. return provRsync
  84. }
  85. func (p *rsyncProvider) Upstream() string {
  86. return p.upstreamURL
  87. }
  88. func (p *rsyncProvider) DataSize() string {
  89. return p.dataSize
  90. }
  91. func (p *rsyncProvider) Run() error {
  92. p.dataSize = ""
  93. defer p.closeLogFile()
  94. if err := p.Start(); err != nil {
  95. return err
  96. }
  97. if err := p.Wait(); err != nil {
  98. return err
  99. }
  100. p.dataSize = internal.ExtractSizeFromRsyncLog(p.LogFile())
  101. return nil
  102. }
  103. func (p *rsyncProvider) Start() error {
  104. p.Lock()
  105. defer p.Unlock()
  106. if p.IsRunning() {
  107. return errors.New("provider is currently running")
  108. }
  109. command := []string{p.rsyncCmd}
  110. command = append(command, p.options...)
  111. command = append(command, p.upstreamURL, p.WorkingDir())
  112. p.cmd = newCmdJob(p, command, p.WorkingDir(), p.rsyncEnv)
  113. if err := p.prepareLogFile(false); err != nil {
  114. return err
  115. }
  116. if err := p.cmd.Start(); err != nil {
  117. return err
  118. }
  119. p.isRunning.Store(true)
  120. return nil
  121. }