Forráskód Böngészése

[bug fix] provider is not terminated if premature stop command received

z4yx 5 éve
szülő
commit
38b0156fae

+ 2 - 1
worker/base_provider.go

@@ -142,7 +142,7 @@ func (p *baseProvider) closeLogFile() (err error) {
 	return
 }
 
-func (p *baseProvider) Run() error {
+func (p *baseProvider) Run(started chan empty) error {
 	panic("Not Implemented")
 }
 
@@ -169,6 +169,7 @@ func (p *baseProvider) Terminate() error {
 	defer p.Unlock()
 	logger.Debugf("terminating provider: %s", p.Name())
 	if !p.IsRunning() {
+		logger.Warningf("Terminate() called while IsRunning is false: %s", p.Name())
 		return nil
 	}
 

+ 1 - 1
worker/cgroup_test.go

@@ -83,7 +83,7 @@ sleep 30
 		So(err, ShouldBeNil)
 
 		go func() {
-			err = provider.Run()
+			err := provider.Run(make(chan empty, 1))
 			ctx.So(err, ShouldNotBeNil)
 		}()
 

+ 3 - 1
worker/cmd_provider.go

@@ -86,12 +86,13 @@ func (p *cmdProvider) DataSize() string {
 	return p.dataSize
 }
 
-func (p *cmdProvider) Run() error {
+func (p *cmdProvider) Run(started chan empty) error {
 	p.dataSize = ""
 	defer p.closeLogFile()
 	if err := p.Start(); err != nil {
 		return err
 	}
+	started <- empty{}
 	if err := p.Wait(); err != nil {
 		return err
 	}
@@ -139,5 +140,6 @@ func (p *cmdProvider) Start() error {
 		return err
 	}
 	p.isRunning.Store(true)
+	logger.Debugf("set isRunning to true: %s", p.Name())
 	return nil
 }

+ 1 - 1
worker/docker_test.go

@@ -87,7 +87,7 @@ sleep 20
 		cmdRun("docker", []string{"images"})
 		exitedErr := make(chan error, 1)
 		go func() {
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 1))
 			logger.Debugf("provider.Run() exited")
 			if err != nil {
 				logger.Errorf("provider.Run() failed: %v", err)

+ 11 - 1
worker/job.go

@@ -155,11 +155,21 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 
 			var syncErr error
 			syncDone := make(chan error, 1)
+			started := make(chan empty, 10) // we may receive "started" more than one time (e.g. two_stage_rsync)
 			go func() {
-				err := provider.Run()
+				err := provider.Run(started)
 				syncDone <- err
 			}()
 
+			select { // Wait until provider started or error happened
+			case err := <-syncDone:
+				logger.Errorf("failed to start provider %s: %s", m.Name(), err.Error())
+				syncDone <- err // it will be read again later
+			case <-started:
+				logger.Debug("provider started")
+			}
+			// Now terminating the provider is feasible
+
 			select {
 			case syncErr = <-syncDone:
 				logger.Debug("syncing done")

+ 3 - 3
worker/provider.go

@@ -24,9 +24,9 @@ type mirrorProvider interface {
 
 	Type() providerEnum
 
-	// run mirror job in background
-	Run() error
-	// run mirror job in background
+	// Start then Wait
+	Run(started chan empty) error
+	// Start the job
 	Start() error
 	// Wait job to finish
 	Wait() error

+ 23 - 17
worker/provider_test.go

@@ -96,7 +96,7 @@ exit 0
 				),
 			)
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 1))
 			So(err, ShouldBeNil)
 			loggedContent, err := ioutil.ReadFile(provider.LogFile())
 			So(err, ShouldBeNil)
@@ -127,7 +127,7 @@ exit 0
 			provider, err := newRsyncProvider(c)
 			So(err, ShouldBeNil)
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 1))
 			So(err, ShouldNotBeNil)
 			loggedContent, err := ioutil.ReadFile(provider.LogFile())
 			So(err, ShouldBeNil)
@@ -195,7 +195,7 @@ exit 0
 				),
 			)
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 1))
 			So(err, ShouldBeNil)
 			loggedContent, err := ioutil.ReadFile(provider.LogFile())
 			So(err, ShouldBeNil)
@@ -257,7 +257,7 @@ exit 0
 				provider.WorkingDir(),
 			)
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 1))
 			So(err, ShouldBeNil)
 			loggedContent, err := ioutil.ReadFile(provider.LogFile())
 			So(err, ShouldBeNil)
@@ -321,7 +321,7 @@ echo $AOSP_REPO_BIN
 			So(err, ShouldBeNil)
 			So(readedScriptContent, ShouldResemble, []byte(scriptContent))
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 1))
 			So(err, ShouldBeNil)
 
 			loggedContent, err := ioutil.ReadFile(provider.LogFile())
@@ -337,7 +337,7 @@ echo $AOSP_REPO_BIN
 			So(err, ShouldBeNil)
 			So(readedScriptContent, ShouldResemble, []byte(scriptContent))
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 1))
 			So(err, ShouldNotBeNil)
 
 		})
@@ -349,11 +349,14 @@ sleep 10
 			err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
 			So(err, ShouldBeNil)
 
+			started := make(chan empty, 1)
 			go func() {
-				err = provider.Run()
+				err := provider.Run(started)
 				ctx.So(err, ShouldNotBeNil)
 			}()
 
+			<-started
+			So(provider.IsRunning(), ShouldBeTrue)
 			time.Sleep(1 * time.Second)
 			err = provider.Terminate()
 			So(err, ShouldBeNil)
@@ -389,7 +392,7 @@ sleep 10
 
 		Convey("Run the command", func() {
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 1))
 			So(err, ShouldBeNil)
 
 		})
@@ -417,7 +420,7 @@ sleep 10
 			provider, err := newCmdProvider(c)
 			So(err, ShouldBeNil)
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 1))
 			So(err, ShouldNotBeNil)
 			So(provider.DataSize(), ShouldBeEmpty)
 		})
@@ -427,7 +430,7 @@ sleep 10
 			provider, err := newCmdProvider(c)
 			So(err, ShouldBeNil)
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 1))
 			So(err, ShouldBeNil)
 		})
 
@@ -437,7 +440,7 @@ sleep 10
 			provider, err := newCmdProvider(c)
 			So(err, ShouldBeNil)
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 1))
 			So(err, ShouldNotBeNil)
 		})
 
@@ -446,7 +449,7 @@ sleep 10
 			provider, err := newCmdProvider(c)
 			So(err, ShouldBeNil)
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 1))
 			So(err, ShouldBeNil)
 			So(provider.DataSize(), ShouldNotBeEmpty)
 			_, err = strconv.ParseFloat(provider.DataSize(), 32)
@@ -458,7 +461,7 @@ sleep 10
 			provider, err := newCmdProvider(c)
 			So(err, ShouldBeNil)
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 1))
 			So(err, ShouldBeNil)
 			So(provider.DataSize(), ShouldBeEmpty)
 		})
@@ -469,7 +472,7 @@ sleep 10
 			provider, err := newCmdProvider(c)
 			So(err, ShouldBeNil)
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 1))
 			So(err, ShouldNotBeNil)
 			So(provider.DataSize(), ShouldBeEmpty)
 		})
@@ -520,7 +523,7 @@ exit 0
 			err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
 			So(err, ShouldBeNil)
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 2))
 			So(err, ShouldBeNil)
 
 			targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
@@ -562,11 +565,14 @@ exit 0
 			err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
 			So(err, ShouldBeNil)
 
+			started := make(chan empty, 2)
 			go func() {
-				err = provider.Run()
+				err := provider.Run(started)
 				ctx.So(err, ShouldNotBeNil)
 			}()
 
+			<-started
+			So(provider.IsRunning(), ShouldBeTrue)
 			time.Sleep(1 * time.Second)
 			err = provider.Terminate()
 			So(err, ShouldBeNil)
@@ -606,7 +612,7 @@ exit 0
 			provider, err := newTwoStageRsyncProvider(c)
 			So(err, ShouldBeNil)
 
-			err = provider.Run()
+			err = provider.Run(make(chan empty, 2))
 			So(err, ShouldNotBeNil)
 			loggedContent, err := ioutil.ReadFile(provider.LogFile())
 			So(err, ShouldBeNil)

+ 3 - 1
worker/rsync_provider.go

@@ -103,12 +103,13 @@ func (p *rsyncProvider) DataSize() string {
 	return p.dataSize
 }
 
-func (p *rsyncProvider) Run() error {
+func (p *rsyncProvider) Run(started chan empty) error {
 	p.dataSize = ""
 	defer p.closeLogFile()
 	if err := p.Start(); err != nil {
 		return err
 	}
+	started <- empty{}
 	if err := p.Wait(); err != nil {
 		code, msg := internal.TranslateRsyncErrorCode(err)
 		if code != 0 {
@@ -144,5 +145,6 @@ func (p *rsyncProvider) Start() error {
 		return err
 	}
 	p.isRunning.Store(true)
+	logger.Debugf("set isRunning to true: %s", p.Name())
 	return nil
 }

+ 2 - 1
worker/two_stage_rsync_provider.go

@@ -133,7 +133,7 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
 	return options, nil
 }
 
-func (p *twoStageRsyncProvider) Run() error {
+func (p *twoStageRsyncProvider) Run(started chan empty) error {
 	p.Lock()
 	defer p.Unlock()
 
@@ -163,6 +163,7 @@ func (p *twoStageRsyncProvider) Run() error {
 		}
 		p.isRunning.Store(true)
 		logger.Debugf("set isRunning to true: %s", p.Name())
+		started <- empty{}
 
 		p.Unlock()
 		err = p.Wait()