Prechádzať zdrojové kódy

refactor(worker): use Run instead of Start and Wait

bigeagle 9 rokov pred
rodič
commit
9339fba074

+ 7 - 0
worker/cmd_provider.go

@@ -44,6 +44,13 @@ func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
 	return provider, nil
 }
 
+func (p *cmdProvider) Run() error {
+	if err := p.Start(); err != nil {
+		return err
+	}
+	return p.Wait()
+}
+
 func (p *cmdProvider) Start() error {
 	env := map[string]string{
 		"TUNASYNC_MIRROR_NAME":  p.Name(),

+ 2 - 9
worker/job.go

@@ -103,18 +103,11 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 
 			// start syncing
 			managerChan <- jobMessage{tunasync.Syncing, m.Name(), ""}
-			err = provider.Start()
-			if err != nil {
-				logger.Error(
-					"failed to start syncing job for %s: %s",
-					m.Name(), err.Error(),
-				)
-				return err
-			}
+
 			var syncErr error
 			syncDone := make(chan error, 1)
 			go func() {
-				err := provider.Wait()
+				err := provider.Run()
 				if !stopASAP {
 					syncDone <- err
 				}

+ 34 - 11
worker/provider.go

@@ -3,6 +3,7 @@ package worker
 import (
 	"errors"
 	"os"
+	"sync"
 	"time"
 )
 
@@ -21,7 +22,8 @@ type mirrorProvider interface {
 	// name
 	Name() string
 
-	// TODO: implement Run, Terminate and Hooks
+	// run mirror job in background
+	Run() error
 	// run mirror job in background
 	Start() error
 	// Wait job to finish
@@ -46,6 +48,8 @@ type mirrorProvider interface {
 }
 
 type baseProvider struct {
+	sync.Mutex
+
 	ctx      *Context
 	name     string
 	interval time.Duration
@@ -118,21 +122,35 @@ func (p *baseProvider) setLogFile() error {
 		p.cmd.SetLogFile(nil)
 		return nil
 	}
-
-	logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
-	if err != nil {
-		logger.Error("Error opening logfile %s: %s", p.LogFile(), err.Error())
-		return err
+	if p.logFile == nil {
+		logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
+		if err != nil {
+			logger.Error("Error opening logfile %s: %s", p.LogFile(), err.Error())
+			return err
+		}
+		p.logFile = logFile
 	}
-	p.logFile = logFile
-	p.cmd.SetLogFile(logFile)
+	p.cmd.SetLogFile(p.logFile)
 	return nil
 }
 
+func (p *baseProvider) Run() error {
+	panic("Not Implemented")
+}
+
+func (p *baseProvider) Start() error {
+	panic("Not Implemented")
+}
+
 func (p *baseProvider) Wait() error {
-	if p.logFile != nil {
-		defer p.logFile.Close()
-	}
+	defer func() {
+		p.Lock()
+		if p.logFile != nil {
+			p.logFile.Close()
+			p.logFile = nil
+		}
+		p.Unlock()
+	}()
 	return p.cmd.Wait()
 }
 
@@ -141,9 +159,14 @@ func (p *baseProvider) Terminate() error {
 	if p.cmd == nil {
 		return errors.New("provider command job not initialized")
 	}
+
+	p.Lock()
 	if p.logFile != nil {
 		p.logFile.Close()
+		p.logFile = nil
 	}
+	p.Unlock()
+
 	err := p.cmd.Terminate()
 	return err
 }

+ 4 - 11
worker/provider_test.go

@@ -116,9 +116,7 @@ echo $AOSP_REPO_BIN
 			So(err, ShouldBeNil)
 			So(readedScriptContent, ShouldResemble, []byte(scriptContent))
 
-			err = provider.Start()
-			So(err, ShouldBeNil)
-			err = provider.Wait()
+			err = provider.Run()
 			So(err, ShouldBeNil)
 
 			loggedContent, err := ioutil.ReadFile(provider.LogFile())
@@ -134,9 +132,7 @@ echo $AOSP_REPO_BIN
 			So(err, ShouldBeNil)
 			So(readedScriptContent, ShouldResemble, []byte(scriptContent))
 
-			err = provider.Start()
-			So(err, ShouldBeNil)
-			err = provider.Wait()
+			err = provider.Run()
 			So(err, ShouldNotBeNil)
 
 		})
@@ -148,15 +144,12 @@ sleep 5
 			err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
 			So(err, ShouldBeNil)
 
-			err = provider.Start()
-			So(err, ShouldBeNil)
-
 			go func() {
-				err = provider.Wait()
+				err = provider.Run()
 				ctx.So(err, ShouldNotBeNil)
 			}()
 
-			time.Sleep(2)
+			time.Sleep(1 * time.Second)
 			err = provider.Terminate()
 			So(err, ShouldBeNil)
 

+ 7 - 0
worker/rsync_provider.go

@@ -55,6 +55,13 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
 	return provider, nil
 }
 
+func (p *rsyncProvider) Run() error {
+	if err := p.Start(); err != nil {
+		return err
+	}
+	return p.Wait()
+}
+
 func (p *rsyncProvider) Start() error {
 	env := map[string]string{}
 	if p.password != "" {

+ 4 - 5
worker/runner.go

@@ -15,6 +15,8 @@ import (
 // it's an alternative to python-sh or go-sh
 // TODO: cgroup excution
 
+var errProcessNotStarted = errors.New("Process Not Started")
+
 type cmdJob struct {
 	cmd        *exec.Cmd
 	workingDir string
@@ -62,11 +64,8 @@ func (c *cmdJob) SetLogFile(logFile *os.File) {
 }
 
 func (c *cmdJob) Terminate() error {
-	if c.cmd == nil {
-		return nil
-	}
-	if c.cmd.Process == nil {
-		return nil
+	if c.cmd == nil || c.cmd.Process == nil {
+		return errProcessNotStarted
 	}
 	err := unix.Kill(c.cmd.Process.Pid, syscall.SIGTERM)
 	if err != nil {