Преглед изворни кода

feature(worker): two-stage-rsync provider

bigeagle пре 9 година
родитељ
комит
13161d77cf
6 измењених фајлова са 323 додато и 15 уклоњено
  1. 5 1
      worker/cmd_provider.go
  2. 16 4
      worker/provider.go
  3. 150 7
      worker/provider_test.go
  4. 15 2
      worker/rsync_provider.go
  5. 1 1
      worker/runner.go
  6. 136 0
      worker/two_stage_rsync_provider.go

+ 5 - 1
worker/cmd_provider.go

@@ -66,5 +66,9 @@ func (p *cmdProvider) Start() error {
 		return err
 	}
 
-	return p.cmd.Start()
+	if err := p.cmd.Start(); err != nil {
+		return err
+	}
+	p.isRunning.Store(true)
+	return nil
 }

+ 16 - 4
worker/provider.go

@@ -1,9 +1,9 @@
 package worker
 
 import (
-	"errors"
 	"os"
 	"sync"
+	"sync/atomic"
 	"time"
 )
 
@@ -31,6 +31,8 @@ type mirrorProvider interface {
 	// terminate mirror job
 	Terminate() error
 	// job hooks
+	IsRunning() bool
+
 	Hooks() []jobHook
 
 	Interval() time.Duration
@@ -54,7 +56,9 @@ type baseProvider struct {
 	name     string
 	interval time.Duration
 
-	cmd     *cmdJob
+	cmd       *cmdJob
+	isRunning atomic.Value
+
 	logFile *os.File
 
 	hooks []jobHook
@@ -142,9 +146,15 @@ func (p *baseProvider) Start() error {
 	panic("Not Implemented")
 }
 
+func (p *baseProvider) IsRunning() bool {
+	isRunning, _ := p.isRunning.Load().(bool)
+	return isRunning
+}
+
 func (p *baseProvider) Wait() error {
 	defer func() {
 		p.Lock()
+		p.isRunning.Store(false)
 		if p.logFile != nil {
 			p.logFile.Close()
 			p.logFile = nil
@@ -156,8 +166,8 @@ func (p *baseProvider) Wait() error {
 
 func (p *baseProvider) Terminate() error {
 	logger.Debug("terminating provider: %s", p.Name())
-	if p.cmd == nil {
-		return errors.New("provider command job not initialized")
+	if !p.IsRunning() {
+		return nil
 	}
 
 	p.Lock()
@@ -168,5 +178,7 @@ func (p *baseProvider) Terminate() error {
 	p.Unlock()
 
 	err := p.cmd.Terminate()
+	p.isRunning.Store(false)
+
 	return err
 }

+ 150 - 7
worker/provider_test.go

@@ -13,15 +13,21 @@ import (
 
 func TestRsyncProvider(t *testing.T) {
 	Convey("Rsync Provider should work", t, func() {
+		tmpDir, err := ioutil.TempDir("", "tunasync")
+		defer os.RemoveAll(tmpDir)
+		So(err, ShouldBeNil)
+		scriptFile := filepath.Join(tmpDir, "myrsync")
+		tmpFile := filepath.Join(tmpDir, "log_file")
 
 		c := rsyncConfig{
 			name:        "tuna",
 			upstreamURL: "rsync://rsync.tuna.moe/tuna/",
-			workingDir:  "/srv/mirror/production/tuna",
-			logDir:      "/var/log/tunasync",
-			logFile:     "tuna.log",
+			rsyncCmd:    scriptFile,
+			workingDir:  tmpDir,
+			logDir:      tmpDir,
+			logFile:     tmpFile,
 			useIPv6:     true,
-			interval:    600,
+			interval:    600 * time.Second,
 		}
 
 		provider, err := newRsyncProvider(c)
@@ -61,6 +67,38 @@ func TestRsyncProvider(t *testing.T) {
 			})
 		})
 
+		Convey("Let's try a run", func() {
+			scriptContent := `#!/bin/bash
+echo "syncing to $(pwd)"
+echo $@
+sleep 1
+echo "Done"
+exit 0
+			`
+			err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
+			So(err, ShouldBeNil)
+
+			expectedOutput := fmt.Sprintf(
+				"syncing to %s\n"+
+					"%s\n"+
+					"Done\n",
+				provider.WorkingDir(),
+				fmt.Sprintf(
+					"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
+						"--delete --delete-after --delay-updates --safe-links "+
+						"--timeout=120 --contimeout=120 -6 %s %s",
+					provider.upstreamURL, provider.WorkingDir(),
+				),
+			)
+
+			err = provider.Run()
+			So(err, ShouldBeNil)
+			loggedContent, err := ioutil.ReadFile(provider.LogFile())
+			So(err, ShouldBeNil)
+			So(string(loggedContent), ShouldEqual, expectedOutput)
+			// fmt.Println(string(loggedContent))
+		})
+
 	})
 }
 
@@ -79,7 +117,7 @@ func TestCmdProvider(t *testing.T) {
 			workingDir:  tmpDir,
 			logDir:      tmpDir,
 			logFile:     tmpFile,
-			interval:    600,
+			interval:    600 * time.Second,
 			env: map[string]string{
 				"AOSP_REPO_BIN": "/usr/local/bin/repo",
 			},
@@ -102,7 +140,7 @@ echo $TUNASYNC_UPSTREAM_URL
 echo $TUNASYNC_LOG_FILE
 echo $AOSP_REPO_BIN
 `
-			exceptedOutput := fmt.Sprintf(
+			expectedOutput := fmt.Sprintf(
 				"%s\n%s\n%s\n%s\n%s\n",
 				provider.WorkingDir(),
 				provider.Name(),
@@ -121,7 +159,7 @@ echo $AOSP_REPO_BIN
 
 			loggedContent, err := ioutil.ReadFile(provider.LogFile())
 			So(err, ShouldBeNil)
-			So(string(loggedContent), ShouldEqual, exceptedOutput)
+			So(string(loggedContent), ShouldEqual, expectedOutput)
 		})
 
 		Convey("If a command fails", func() {
@@ -156,3 +194,108 @@ sleep 5
 		})
 	})
 }
+
+func TestTwoStageRsyncProvider(t *testing.T) {
+	Convey("TwoStageRsync Provider should work", t, func(ctx C) {
+		tmpDir, err := ioutil.TempDir("", "tunasync")
+		defer os.RemoveAll(tmpDir)
+		So(err, ShouldBeNil)
+		scriptFile := filepath.Join(tmpDir, "myrsync")
+		tmpFile := filepath.Join(tmpDir, "log_file")
+
+		c := twoStageRsyncConfig{
+			name:          "tuna-two-stage-rsync",
+			upstreamURL:   "rsync://mirrors.tuna.moe/",
+			stage1Profile: "debian",
+			rsyncCmd:      scriptFile,
+			workingDir:    tmpDir,
+			logDir:        tmpDir,
+			logFile:       tmpFile,
+			useIPv6:       true,
+			excludeFile:   tmpFile,
+		}
+
+		provider, err := newTwoStageRsyncProvider(c)
+		So(err, ShouldBeNil)
+
+		So(provider.Name(), ShouldEqual, c.name)
+		So(provider.WorkingDir(), ShouldEqual, c.workingDir)
+		So(provider.LogDir(), ShouldEqual, c.logDir)
+		So(provider.LogFile(), ShouldEqual, c.logFile)
+		So(provider.Interval(), ShouldEqual, c.interval)
+
+		Convey("Try a command", func(ctx C) {
+			scriptContent := `#!/bin/bash
+echo "syncing to $(pwd)"
+echo $@
+sleep 1
+echo "Done"
+exit 0
+			`
+			err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
+			So(err, ShouldBeNil)
+
+			err = provider.Run()
+			So(err, ShouldBeNil)
+
+			expectedOutput := fmt.Sprintf(
+				"syncing to %s\n"+
+					"%s\n"+
+					"Done\n"+
+					"syncing to %s\n"+
+					"%s\n"+
+					"Done\n",
+				provider.WorkingDir(),
+				fmt.Sprintf(
+					"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
+						"--timeout=120 --contimeout=120 --exclude dists/ -6 "+
+						"--exclude-from %s %s %s",
+					provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
+				),
+				provider.WorkingDir(),
+				fmt.Sprintf(
+					"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
+						"--delete --delete-after --delay-updates --safe-links "+
+						"--timeout=120 --contimeout=120 -6 --exclude-from %s %s %s",
+					provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
+				),
+			)
+
+			loggedContent, err := ioutil.ReadFile(provider.LogFile())
+			So(err, ShouldBeNil)
+			So(string(loggedContent), ShouldEqual, expectedOutput)
+			// fmt.Println(string(loggedContent))
+
+		})
+		Convey("Try terminating", func(ctx C) {
+			scriptContent := `#!/bin/bash
+echo $@
+sleep 4
+exit 0
+			`
+			err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
+			So(err, ShouldBeNil)
+
+			go func() {
+				err = provider.Run()
+				ctx.So(err, ShouldNotBeNil)
+			}()
+
+			time.Sleep(1 * time.Second)
+			err = provider.Terminate()
+			So(err, ShouldBeNil)
+
+			expectedOutput := fmt.Sprintf(
+				"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
+					"--timeout=120 --contimeout=120 --exclude dists/ -6 "+
+					"--exclude-from %s %s %s\n",
+				provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
+			)
+
+			loggedContent, err := ioutil.ReadFile(provider.LogFile())
+			So(err, ShouldBeNil)
+			So(string(loggedContent), ShouldEqual, expectedOutput)
+			// fmt.Println(string(loggedContent))
+		})
+	})
+}

+ 15 - 2
worker/rsync_provider.go

@@ -1,6 +1,10 @@
 package worker
 
-import "time"
+import (
+	"errors"
+	"strings"
+	"time"
+)
 
 type rsyncConfig struct {
 	name                               string
@@ -20,6 +24,9 @@ type rsyncProvider struct {
 
 func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
 	// TODO: check config options
+	if !strings.HasSuffix(c.upstreamURL, "/") {
+		return nil, errors.New("rsync upstream URL should ends with /")
+	}
 	provider := &rsyncProvider{
 		baseProvider: baseProvider{
 			name:     c.name,
@@ -47,6 +54,7 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
 	if c.excludeFile != "" {
 		options = append(options, "--exclude-from", c.excludeFile)
 	}
+	provider.options = options
 
 	provider.ctx.Set(_WorkingDirKey, c.workingDir)
 	provider.ctx.Set(_LogDirKey, c.logDir)
@@ -63,6 +71,7 @@ func (p *rsyncProvider) Run() error {
 }
 
 func (p *rsyncProvider) Start() error {
+
 	env := map[string]string{}
 	if p.password != "" {
 		env["RSYNC_PASSWORD"] = p.password
@@ -76,5 +85,9 @@ func (p *rsyncProvider) Start() error {
 		return err
 	}
 
-	return p.cmd.Start()
+	if err := p.cmd.Start(); err != nil {
+		return err
+	}
+	p.isRunning.Store(true)
+	return nil
 }

+ 1 - 1
worker/runner.go

@@ -54,7 +54,7 @@ func (c *cmdJob) Start() error {
 
 func (c *cmdJob) Wait() error {
 	err := c.cmd.Wait()
-	c.finished <- empty{}
+	close(c.finished)
 	return err
 }
 

+ 136 - 0
worker/two_stage_rsync_provider.go

@@ -0,0 +1,136 @@
+package worker
+
+import (
+	"errors"
+	"fmt"
+	"strings"
+	"time"
+)
+
+type twoStageRsyncConfig struct {
+	name                               string
+	rsyncCmd                           string
+	stage1Profile                      string
+	upstreamURL, password, excludeFile string
+	workingDir, logDir, logFile        string
+	useIPv6                            bool
+	interval                           time.Duration
+}
+
+// An RsyncProvider provides the implementation to rsync-based syncing jobs
+type twoStageRsyncProvider struct {
+	baseProvider
+	twoStageRsyncConfig
+	stage1Options []string
+	stage2Options []string
+}
+
+var rsyncStage1Profiles = map[string]([]string){
+	"debian": []string{"dists/"},
+	"debian-oldstyle": []string{
+		"Packages*", "Sources*", "Release*",
+		"InRelease", "i18n/*", "ls-lR*", "dep11/*",
+	},
+}
+
+func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, error) {
+	// TODO: check config options
+	if !strings.HasSuffix(c.upstreamURL, "/") {
+		return nil, errors.New("rsync upstream URL should ends with /")
+	}
+
+	provider := &twoStageRsyncProvider{
+		baseProvider: baseProvider{
+			name:     c.name,
+			ctx:      NewContext(),
+			interval: c.interval,
+		},
+		twoStageRsyncConfig: c,
+		stage1Options: []string{
+			"-aHvh", "--no-o", "--no-g", "--stats",
+			"--exclude", ".~tmp~/",
+			"--safe-links", "--timeout=120", "--contimeout=120",
+		},
+		stage2Options: []string{
+			"-aHvh", "--no-o", "--no-g", "--stats",
+			"--exclude", ".~tmp~/",
+			"--delete", "--delete-after", "--delay-updates",
+			"--safe-links", "--timeout=120", "--contimeout=120",
+		},
+	}
+
+	if c.rsyncCmd == "" {
+		provider.rsyncCmd = "rsync"
+	}
+
+	provider.ctx.Set(_WorkingDirKey, c.workingDir)
+	provider.ctx.Set(_LogDirKey, c.logDir)
+	provider.ctx.Set(_LogFileKey, c.logFile)
+
+	return provider, nil
+}
+
+func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
+	var options []string
+	if stage == 1 {
+		options = append(options, p.stage1Options...)
+		stage1Excludes, ok := rsyncStage1Profiles[p.stage1Profile]
+		if !ok {
+			return nil, errors.New("Invalid Stage 1 Profile")
+		}
+		for _, exc := range stage1Excludes {
+			options = append(options, "--exclude", exc)
+		}
+
+	} else if stage == 2 {
+		options = append(options, p.stage2Options...)
+	} else {
+		return []string{}, fmt.Errorf("Invalid stage: %d", stage)
+	}
+
+	if p.useIPv6 {
+		options = append(options, "-6")
+	}
+
+	if p.excludeFile != "" {
+		options = append(options, "--exclude-from", p.excludeFile)
+	}
+
+	return options, nil
+}
+
+func (p *twoStageRsyncProvider) Run() error {
+
+	env := map[string]string{}
+	if p.password != "" {
+		env["RSYNC_PASSWORD"] = p.password
+	}
+
+	stages := []int{1, 2}
+	for _, stage := range stages {
+		command := []string{p.rsyncCmd}
+		options, err := p.Options(stage)
+		if err != nil {
+			return err
+		}
+		command = append(command, options...)
+		command = append(command, p.upstreamURL, p.WorkingDir())
+
+		p.cmd = newCmdJob(command, p.WorkingDir(), env)
+		if err := p.setLogFile(); err != nil {
+			return err
+		}
+
+		if err = p.cmd.Start(); err != nil {
+			return err
+		}
+		p.isRunning.Store(true)
+
+		err = p.cmd.Wait()
+		p.isRunning.Store(false)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}