Prechádzať zdrojové kódy

Merge pull request #37 from tuna/dev

Dev
bigeagle 9 rokov pred
rodič
commit
0db8fc6614

+ 36 - 14
worker/cgroup.go

@@ -2,18 +2,20 @@ package worker
 
 import (
 	"bufio"
+	"errors"
 	"fmt"
 	"os"
 	"path/filepath"
 	"strconv"
 	"syscall"
+	"time"
 
 	"golang.org/x/sys/unix"
 
 	"github.com/codeskyblue/go-sh"
 )
 
-var cgSubsystem string = "cpu"
+var cgSubsystem = "cpu"
 
 type cgroupHook struct {
 	emptyHook
@@ -82,23 +84,43 @@ func (c *cgroupHook) killAll() error {
 		return nil
 	}
 	name := c.provider.Name()
-	taskFile, err := os.Open(filepath.Join(c.basePath, cgSubsystem, c.baseGroup, name, "tasks"))
-	if err != nil {
-		return err
+
+	readTaskList := func() ([]int, error) {
+		taskList := []int{}
+		taskFile, err := os.Open(filepath.Join(c.basePath, cgSubsystem, c.baseGroup, name, "tasks"))
+		if err != nil {
+			return taskList, err
+		}
+		defer taskFile.Close()
+
+		scanner := bufio.NewScanner(taskFile)
+		for scanner.Scan() {
+			pid, err := strconv.Atoi(scanner.Text())
+			if err != nil {
+				return taskList, err
+			}
+			taskList = append(taskList, pid)
+		}
+		return taskList, nil
 	}
-	defer taskFile.Close()
-	taskList := []int{}
-	scanner := bufio.NewScanner(taskFile)
-	for scanner.Scan() {
-		pid, err := strconv.Atoi(scanner.Text())
+
+	for i := 0; i < 4; i++ {
+		if i == 3 {
+			return errors.New("Unable to kill all child tasks")
+		}
+		taskList, err := readTaskList()
 		if err != nil {
 			return err
 		}
-		taskList = append(taskList, pid)
-	}
-	for _, pid := range taskList {
-		logger.Debugf("Killing process: %d", pid)
-		unix.Kill(pid, syscall.SIGKILL)
+		if len(taskList) == 0 {
+			return nil
+		}
+		for _, pid := range taskList {
+			logger.Debugf("Killing process: %d", pid)
+			unix.Kill(pid, syscall.SIGKILL)
+		}
+		// sleep 10ms for the first round, and 1.01s, 2.01s, 3.01s for the rest
+		time.Sleep(time.Duration(i)*time.Second + 10*time.Millisecond)
 	}
 
 	return nil

+ 1 - 1
worker/loglimit_hook.go

@@ -79,7 +79,7 @@ func (l *logLimiter) preExec() error {
 
 	logLink := filepath.Join(logDir, "latest")
 
-	if _, err = os.Stat(logLink); err == nil {
+	if _, err = os.Lstat(logLink); err == nil {
 		os.Remove(logLink)
 	}
 	os.Symlink(logFileName, logLink)

+ 15 - 3
worker/runner.go

@@ -5,6 +5,7 @@ import (
 	"os"
 	"os/exec"
 	"strings"
+	"sync"
 	"syscall"
 	"time"
 
@@ -17,12 +18,14 @@ import (
 var errProcessNotStarted = errors.New("Process Not Started")
 
 type cmdJob struct {
+	sync.Mutex
 	cmd        *exec.Cmd
 	workingDir string
 	env        map[string]string
 	logFile    *os.File
 	finished   chan empty
 	provider   mirrorProvider
+	retErr     error
 }
 
 func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob {
@@ -69,9 +72,18 @@ func (c *cmdJob) Start() error {
 }
 
 func (c *cmdJob) Wait() error {
-	err := c.cmd.Wait()
-	close(c.finished)
-	return err
+	c.Lock()
+	defer c.Unlock()
+
+	select {
+	case <-c.finished:
+		return c.retErr
+	default:
+		err := c.cmd.Wait()
+		c.retErr = err
+		close(c.finished)
+		return err
+	}
 }
 
 func (c *cmdJob) SetLogFile(logFile *os.File) {

+ 1 - 0
worker/two_stage_rsync_provider.go

@@ -108,6 +108,7 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
 }
 
 func (p *twoStageRsyncProvider) Run() error {
+	defer p.Wait()
 
 	env := map[string]string{}
 	if p.username != "" {