ソースを参照

Merge pull request #209 from tuna/success_exit_code

Allow setting success exit codes globally and for each mirror (fixes #207)
Shengqi Chen 6 ヶ月 前
コミット
833027a6a0
6 ファイル変更162 行追加5 行削除
  1. 19 3
      worker/base_provider.go
  2. 6 0
      worker/config.go
  3. 56 0
      worker/config_test.go
  4. 16 0
      worker/provider.go
  5. 53 0
      worker/provider_test.go
  6. 12 2
      worker/runner.go

+ 19 - 3
worker/base_provider.go

@@ -19,9 +19,10 @@ type baseProvider struct {
 	timeout  time.Duration
 	isMaster bool
 
-	cmd       *cmdJob
-	logFileFd *os.File
-	isRunning atomic.Value
+	cmd              *cmdJob
+	logFileFd        *os.File
+	isRunning        atomic.Value
+	successExitCodes []int
 
 	cgroup *cgroupHook
 	zfs    *zfsHook
@@ -186,3 +187,18 @@ func (p *baseProvider) Terminate() error {
 func (p *baseProvider) DataSize() string {
 	return ""
 }
+
+func (p *baseProvider) SetSuccessExitCodes(codes []int) {
+	if codes == nil {
+		p.successExitCodes = []int{}
+	} else {
+		p.successExitCodes = codes
+	}
+}
+
+func (p *baseProvider) GetSuccessExitCodes() []int {
+	if p.successExitCodes == nil {
+		return []int{}
+	}
+	return p.successExitCodes
+}

+ 6 - 0
worker/config.go

@@ -63,6 +63,9 @@ type globalConfig struct {
 
 	ExecOnSuccess []string `toml:"exec_on_success"`
 	ExecOnFailure []string `toml:"exec_on_failure"`
+
+	// merged with mirror-specific options. make sure you know what you are doing!
+	SuccessExitCodes []int `toml:"dangerous_global_success_exit_codes"`
 }
 
 type managerConfig struct {
@@ -169,6 +172,9 @@ type mirrorConfig struct {
 	ExecOnSuccessExtra []string `toml:"exec_on_success_extra"`
 	ExecOnFailureExtra []string `toml:"exec_on_failure_extra"`
 
+	// will be merged with global option
+	SuccessExitCodes []int `toml:"success_exit_codes"`
+
 	Command           string   `toml:"command"`
 	FailOnMatch       string   `toml:"fail_on_match"`
 	SizePattern       string   `toml:"size_pattern"`

+ 56 - 0
worker/config_test.go

@@ -521,4 +521,60 @@ rsync_options = ["--local"]
 			"--local",       // from mirror.rsync_options
 		})
 	})
+
+	Convey("success_exit_codes should work globally and per mirror", t, func() {
+		tmpfile, err := os.CreateTemp("", "tunasync")
+		So(err, ShouldEqual, nil)
+		defer os.Remove(tmpfile.Name())
+
+		cfgBlob1 := `
+[global]
+name = "test_worker"
+log_dir = "/var/log/tunasync/{{.Name}}"
+mirror_dir = "/data/mirrors"
+concurrent = 10
+interval = 240
+retry = 3
+timeout = 86400
+dangerous_global_success_exit_codes = [10, 20]
+
+[manager]
+api_base = "https://127.0.0.1:5000"
+token = "some_token"
+
+[server]
+hostname = "worker1.example.com"
+listen_addr = "127.0.0.1"
+listen_port = 6000
+ssl_cert = "/etc/tunasync.d/worker1.cert"
+ssl_key = "/etc/tunasync.d/worker1.key"
+
+[[mirrors]]
+name = "foo"
+provider = "rsync"
+upstream = "rsync://foo.bar/"
+interval = 720
+retry = 2
+timeout = 3600
+mirror_dir = "/data/foo"
+success_exit_codes = [30, 40]
+`
+
+		err = os.WriteFile(tmpfile.Name(), []byte(cfgBlob1), 0644)
+		So(err, ShouldEqual, nil)
+		defer tmpfile.Close()
+
+		cfg, err := LoadConfig(tmpfile.Name())
+		So(err, ShouldBeNil)
+
+		providers := map[string]mirrorProvider{}
+		for _, m := range cfg.Mirrors {
+			p := newMirrorProvider(m, cfg)
+			providers[p.Name()] = p
+		}
+
+		p, ok := providers["foo"].(*rsyncProvider)
+		So(ok, ShouldBeTrue)
+		So(p.successExitCodes, ShouldResemble, []int{10, 20, 30, 40})
+	})
 }

+ 16 - 0
worker/provider.go

@@ -60,6 +60,10 @@ type mirrorProvider interface {
 	ExitContext() *Context
 	// return context
 	Context() *Context
+
+	// set in newMirrorProvider, used by cmdJob.Wait
+	SetSuccessExitCodes(codes []int)
+	GetSuccessExitCodes() []int
 }
 
 // newProvider creates a mirrorProvider instance
@@ -249,5 +253,17 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
 	}
 	addHookFromCmdList(mirror.ExecOnFailureExtra, execOnFailure)
 
+	successExitCodes := []int{}
+	if cfg.Global.SuccessExitCodes != nil {
+		successExitCodes = append(successExitCodes, cfg.Global.SuccessExitCodes...)
+	}
+	if mirror.SuccessExitCodes != nil {
+		successExitCodes = append(successExitCodes, mirror.SuccessExitCodes...)
+	}
+	if len(successExitCodes) > 0 {
+		logger.Infof("Non-zero success exit codes set for mirror %s: %v", mirror.Name, successExitCodes)
+		provider.SetSuccessExitCodes(successExitCodes)
+	}
+
 	return provider
 }

+ 53 - 0
worker/provider_test.go

@@ -552,6 +552,59 @@ sleep 10
 			So(provider.DataSize(), ShouldBeEmpty)
 		})
 	})
+	Convey("Command Provider with successExitCodes should work", t, func(ctx C) {
+		tmpDir, err := os.MkdirTemp("", "tunasync")
+		defer os.RemoveAll(tmpDir)
+		So(err, ShouldBeNil)
+		scriptFile := filepath.Join(tmpDir, "cmd.sh")
+		tmpFile := filepath.Join(tmpDir, "log_file")
+
+		c := cmdConfig{
+			name:        "tuna-cmd",
+			upstreamURL: "http://mirrors.tuna.moe/",
+			command:     "bash " + scriptFile,
+			workingDir:  tmpDir,
+			logDir:      tmpDir,
+			logFile:     tmpFile,
+			interval:    600 * time.Second,
+		}
+
+		provider, err := newCmdProvider(c)
+		provider.SetSuccessExitCodes([]int{199, 200})
+		So(err, ShouldBeNil)
+
+		So(provider.Type(), ShouldEqual, provCommand)
+		So(provider.Name(), ShouldEqual, c.name)
+		So(provider.WorkingDir(), ShouldEqual, c.workingDir)
+		So(provider.LogDir(), ShouldEqual, c.logDir)
+		So(provider.LogFile(), ShouldEqual, c.logFile)
+		So(provider.Interval(), ShouldEqual, c.interval)
+		So(provider.GetSuccessExitCodes(), ShouldResemble, []int{199, 200})
+
+		Convey("Command exits with configured successExitCodes", func() {
+			scriptContent := `exit 199`
+			err = os.WriteFile(scriptFile, []byte(scriptContent), 0755)
+			So(err, ShouldBeNil)
+			readedScriptContent, err := os.ReadFile(scriptFile)
+			So(err, ShouldBeNil)
+			So(readedScriptContent, ShouldResemble, []byte(scriptContent))
+
+			err = provider.Run(make(chan empty, 1))
+			So(err, ShouldBeNil)
+		})
+
+		Convey("Command exits with unknown exit code", func() {
+			scriptContent := `exit 201`
+			err = os.WriteFile(scriptFile, []byte(scriptContent), 0755)
+			So(err, ShouldBeNil)
+			readedScriptContent, err := os.ReadFile(scriptFile)
+			So(err, ShouldBeNil)
+			So(readedScriptContent, ShouldResemble, []byte(scriptContent))
+
+			err = provider.Run(make(chan empty, 1))
+			So(err, ShouldNotBeNil)
+		})
+	})
 }
 
 func TestTwoStageRsyncProvider(t *testing.T) {

+ 12 - 2
worker/runner.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"os"
 	"os/exec"
+	"slices"
 	"strings"
 	"sync"
 	"syscall"
@@ -171,9 +172,18 @@ func (c *cmdJob) Wait() error {
 		return c.retErr
 	default:
 		err := c.cmd.Wait()
-		c.retErr = err
 		close(c.finished)
-		return err
+		if err != nil {
+			code := err.(*exec.ExitError).ExitCode()
+			allowedCodes := c.provider.GetSuccessExitCodes()
+			if slices.Contains(allowedCodes, code) {
+				// process exited with non-success status
+				logger.Infof("Command %s exited with code %d: treated as success (allowed: %v)", c.cmd.Args, code, allowedCodes)
+			} else {
+				c.retErr = err
+			}
+		}
+		return c.retErr
 	}
 }