rsync_provider.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package worker
  2. import (
  3. "errors"
  4. "strings"
  5. "time"
  6. )
  7. type rsyncConfig struct {
  8. name string
  9. rsyncCmd string
  10. upstreamURL, username, password, excludeFile string
  11. workingDir, logDir, logFile string
  12. useIPv6 bool
  13. interval time.Duration
  14. retry int
  15. }
  16. // An RsyncProvider provides the implementation to rsync-based syncing jobs
  17. type rsyncProvider struct {
  18. baseProvider
  19. rsyncConfig
  20. options []string
  21. }
  22. func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
  23. // TODO: check config options
  24. if !strings.HasSuffix(c.upstreamURL, "/") {
  25. return nil, errors.New("rsync upstream URL should ends with /")
  26. }
  27. provider := &rsyncProvider{
  28. baseProvider: baseProvider{
  29. name: c.name,
  30. ctx: NewContext(),
  31. interval: c.interval,
  32. retry: c.retry,
  33. },
  34. rsyncConfig: c,
  35. }
  36. if c.rsyncCmd == "" {
  37. provider.rsyncCmd = "rsync"
  38. }
  39. options := []string{
  40. "-aHvh", "--no-o", "--no-g", "--stats",
  41. "--exclude", ".~tmp~/",
  42. "--delete", "--delete-after", "--delay-updates",
  43. "--safe-links", "--timeout=120", "--contimeout=120",
  44. }
  45. if c.useIPv6 {
  46. options = append(options, "-6")
  47. }
  48. if c.excludeFile != "" {
  49. options = append(options, "--exclude-from", c.excludeFile)
  50. }
  51. provider.options = options
  52. provider.ctx.Set(_WorkingDirKey, c.workingDir)
  53. provider.ctx.Set(_LogDirKey, c.logDir)
  54. provider.ctx.Set(_LogFileKey, c.logFile)
  55. return provider, nil
  56. }
  57. func (p *rsyncProvider) Type() providerEnum {
  58. return provRsync
  59. }
  60. func (p *rsyncProvider) Upstream() string {
  61. return p.upstreamURL
  62. }
  63. func (p *rsyncProvider) Run() error {
  64. if err := p.Start(); err != nil {
  65. return err
  66. }
  67. return p.Wait()
  68. }
  69. func (p *rsyncProvider) Start() error {
  70. env := map[string]string{}
  71. if p.username != "" {
  72. env["USER"] = p.username
  73. }
  74. if p.password != "" {
  75. env["RSYNC_PASSWORD"] = p.password
  76. }
  77. command := []string{p.rsyncCmd}
  78. command = append(command, p.options...)
  79. command = append(command, p.upstreamURL, p.WorkingDir())
  80. p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
  81. if err := p.prepareLogFile(); err != nil {
  82. return err
  83. }
  84. if err := p.cmd.Start(); err != nil {
  85. return err
  86. }
  87. p.isRunning.Store(true)
  88. return nil
  89. }