123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- package worker
- import (
- "errors"
- "fmt"
- "strings"
- "time"
- "github.com/tuna/tunasync/internal"
- )
- type rsyncConfig struct {
- name string
- rsyncCmd string
- upstreamURL, username, password, excludeFile string
- extraOptions []string
- overriddenOptions []string
- useOverrideOnly bool
- rsyncNeverTimeout bool
- rsyncTimeoutValue int
- rsyncEnv map[string]string
- workingDir, logDir, logFile string
- useIPv6, useIPv4 bool
- interval time.Duration
- retry int
- timeout time.Duration
- }
- // An RsyncProvider provides the implementation to rsync-based syncing jobs
- type rsyncProvider struct {
- baseProvider
- rsyncConfig
- options []string
- dataSize string
- }
- func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
- // TODO: check config options
- if !strings.HasSuffix(c.upstreamURL, "/") {
- return nil, errors.New("rsync upstream URL should ends with /")
- }
- if c.retry == 0 {
- c.retry = defaultMaxRetry
- }
- provider := &rsyncProvider{
- baseProvider: baseProvider{
- name: c.name,
- ctx: NewContext(),
- interval: c.interval,
- retry: c.retry,
- timeout: c.timeout,
- },
- rsyncConfig: c,
- }
- if c.rsyncCmd == "" {
- provider.rsyncCmd = "rsync"
- }
- if c.rsyncEnv == nil {
- provider.rsyncEnv = map[string]string{}
- }
- if c.username != "" {
- provider.rsyncEnv["USER"] = c.username
- }
- if c.password != "" {
- provider.rsyncEnv["RSYNC_PASSWORD"] = c.password
- }
- options := []string{
- "-aHvh", "--no-o", "--no-g", "--stats",
- "--filter", "risk .~tmp~/", "--exclude", ".~tmp~/",
- "--delete", "--delete-after", "--delay-updates",
- "--safe-links",
- }
- if c.overriddenOptions != nil {
- options = c.overriddenOptions
- }
- if c.useOverrideOnly == true {
- if c.overriddenOptions == nil {
- return nil, errors.New("rsync_override_only is set but no rsync_override provided")
- }
- } else {
- if !c.rsyncNeverTimeout {
- timeo := 120
- if c.rsyncTimeoutValue > 0 {
- timeo = c.rsyncTimeoutValue
- }
- options = append(options, fmt.Sprintf("--timeout=%d", timeo))
- }
- if c.useIPv6 {
- options = append(options, "-6")
- } else if c.useIPv4 {
- options = append(options, "-4")
- }
- if c.excludeFile != "" {
- options = append(options, "--exclude-from", c.excludeFile)
- }
- if c.extraOptions != nil {
- options = append(options, c.extraOptions...)
- }
- }
- provider.options = options
- provider.ctx.Set(_WorkingDirKey, c.workingDir)
- provider.ctx.Set(_LogDirKey, c.logDir)
- provider.ctx.Set(_LogFileKey, c.logFile)
- return provider, nil
- }
- func (p *rsyncProvider) Type() providerEnum {
- return provRsync
- }
- func (p *rsyncProvider) Upstream() string {
- return p.upstreamURL
- }
- func (p *rsyncProvider) DataSize() string {
- return p.dataSize
- }
- func (p *rsyncProvider) Run(started chan empty) error {
- p.dataSize = ""
- defer p.closeLogFile()
- if err := p.Start(); err != nil {
- return err
- }
- started <- empty{}
- if err := p.Wait(); err != nil {
- code, msg := internal.TranslateRsyncErrorCode(err)
- if code != 0 {
- logger.Debug("Rsync exitcode %d (%s)", code, msg)
- if p.logFileFd != nil {
- p.logFileFd.WriteString(msg + "\n")
- }
- }
- return err
- }
- p.dataSize = internal.ExtractSizeFromRsyncLog(p.LogFile())
- return nil
- }
- func (p *rsyncProvider) Start() error {
- p.Lock()
- defer p.Unlock()
- if p.IsRunning() {
- return errors.New("provider is currently running")
- }
- command := []string{p.rsyncCmd}
- command = append(command, p.options...)
- command = append(command, p.upstreamURL, p.WorkingDir())
- p.cmd = newCmdJob(p, command, p.WorkingDir(), p.rsyncEnv)
- if err := p.prepareLogFile(false); err != nil {
- return err
- }
- if err := p.cmd.Start(); err != nil {
- return err
- }
- p.isRunning.Store(true)
- logger.Debugf("set isRunning to true: %s", p.Name())
- return nil
- }
|