Ver Fonte

feature(worker): LogLimiter hook

bigeagle há 9 anos atrás
pai
commit
9afd47ddcb
4 ficheiros alterados com 267 adições e 0 exclusões
  1. 12 0
      worker/job.go
  2. 108 0
      worker/loglimit_hook.go
  3. 146 0
      worker/loglimit_test.go
  4. 1 0
      worker/provider.go

+ 12 - 0
worker/job.go

@@ -44,6 +44,18 @@ func (m *mirrorJob) Name() string {
 	return m.provider.Name()
 }
 
+func (m *mirrorJob) Stopped() bool {
+	if !m.enabled {
+		return true
+	}
+	select {
+	case <-m.stopped:
+		return true
+	default:
+		return false
+	}
+}
+
 // runMirrorJob is the goroutine where syncing job runs in
 // arguments:
 //    provider: mirror provider object

+ 108 - 0
worker/loglimit_hook.go

@@ -0,0 +1,108 @@
+package worker
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"sort"
+	"strings"
+	"time"
+)
+
+// limit
+
+type logLimiter struct {
+	emptyHook
+	provider mirrorProvider
+}
+
+func newLogLimiter(provider mirrorProvider) *logLimiter {
+	return &logLimiter{
+		provider: provider,
+	}
+}
+
+type fileSlice []os.FileInfo
+
+func (f fileSlice) Len() int           { return len(f) }
+func (f fileSlice) Swap(i, j int)      { f[i], f[j] = f[j], f[i] }
+func (f fileSlice) Less(i, j int) bool { return f[i].ModTime().Before(f[j].ModTime()) }
+
+func (l *logLimiter) preExec() error {
+	logger.Debug("executing log limitter for %s", l.provider.Name())
+
+	p := l.provider
+	if p.LogFile() == "/dev/null" {
+		return nil
+	}
+
+	logDir := p.LogDir()
+	files, err := ioutil.ReadDir(logDir)
+	if err != nil {
+		if os.IsNotExist(err) {
+			os.MkdirAll(logDir, 0755)
+		} else {
+			return err
+		}
+	}
+	matchedFiles := []os.FileInfo{}
+	for _, f := range files {
+		if strings.HasPrefix(f.Name(), p.Name()) {
+			matchedFiles = append(matchedFiles, f)
+		}
+	}
+
+	// sort the filelist in time order
+	// earlier modified files are sorted as larger
+	sort.Sort(
+		sort.Reverse(
+			fileSlice(matchedFiles),
+		),
+	)
+	// remove old files
+	if len(matchedFiles) > 9 {
+		for _, f := range matchedFiles[9:] {
+			// logger.Debug(f.Name())
+			os.Remove(filepath.Join(logDir, f.Name()))
+		}
+	}
+
+	logFile := filepath.Join(
+		logDir,
+		fmt.Sprintf(
+			"%s_%s.log",
+			p.Name(),
+			time.Now().Format("2006-01-02_15_04"),
+		),
+	)
+
+	logLink := filepath.Join(logDir, "latest")
+
+	if _, err = os.Stat(logLink); err == nil {
+		os.Remove(logLink)
+	}
+	os.Symlink(logFile, logLink)
+
+	ctx := p.EnterContext()
+	ctx.Set(_LogFileKey, logFile)
+	return nil
+}
+
+func (l *logLimiter) postSuccess() error {
+	l.provider.ExitContext()
+	return nil
+}
+
+func (l *logLimiter) postFail() error {
+	logFile := l.provider.LogFile()
+	logFileFail := logFile + ".fail"
+	logDir := l.provider.LogDir()
+	logLink := filepath.Join(logDir, "latest")
+	os.Rename(logFile, logFileFail)
+	os.Remove(logLink)
+	os.Symlink(logFileFail, logLink)
+
+	l.provider.ExitContext()
+	return nil
+}

+ 146 - 0
worker/loglimit_test.go

@@ -0,0 +1,146 @@
+package worker
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"testing"
+	"time"
+
+	. "github.com/smartystreets/goconvey/convey"
+	. "github.com/tuna/tunasync/internal"
+)
+
+func TestLogLimiter(t *testing.T) {
+	Convey("LogLimiter should work", t, func(ctx C) {
+		tmpDir, err := ioutil.TempDir("", "tunasync")
+		tmpLogDir, err := ioutil.TempDir("", "tunasync-log")
+		defer os.RemoveAll(tmpDir)
+		defer os.RemoveAll(tmpLogDir)
+		So(err, ShouldBeNil)
+		scriptFile := filepath.Join(tmpDir, "cmd.sh")
+
+		c := cmdConfig{
+			name:        "tuna-loglimit",
+			upstreamURL: "http://mirrors.tuna.moe/",
+			command:     scriptFile,
+			workingDir:  tmpDir,
+			logDir:      tmpLogDir,
+			logFile:     filepath.Join(tmpLogDir, "latest.log"),
+			interval:    600 * time.Second,
+		}
+
+		provider, err := newCmdProvider(c)
+		So(err, ShouldBeNil)
+		limiter := newLogLimiter(provider)
+		provider.AddHook(limiter)
+
+		Convey("If logs are created simply", func() {
+			for i := 0; i < 15; i++ {
+				fn := filepath.Join(tmpLogDir, fmt.Sprintf("%s-%d.log", provider.Name(), i))
+				f, _ := os.Create(fn)
+				// time.Sleep(1 * time.Second)
+				f.Close()
+			}
+
+			matches, _ := filepath.Glob(filepath.Join(tmpLogDir, "*.log"))
+			So(len(matches), ShouldEqual, 15)
+
+			managerChan := make(chan jobMessage)
+			semaphore := make(chan empty, 1)
+			job := newMirrorJob(provider)
+
+			scriptContent := `#!/bin/bash
+echo $TUNASYNC_WORKING_DIR
+echo $TUNASYNC_MIRROR_NAME
+echo $TUNASYNC_UPSTREAM_URL
+echo $TUNASYNC_LOG_FILE
+			`
+
+			err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
+			So(err, ShouldBeNil)
+
+			go job.Run(managerChan, semaphore)
+			job.ctrlChan <- jobStart
+			msg := <-managerChan
+			So(msg.status, ShouldEqual, PreSyncing)
+			msg = <-managerChan
+			So(msg.status, ShouldEqual, Syncing)
+			logFile := provider.LogFile()
+			msg = <-managerChan
+			So(msg.status, ShouldEqual, Success)
+
+			job.ctrlChan <- jobDisable
+
+			So(logFile, ShouldNotEqual, provider.LogFile())
+
+			matches, _ = filepath.Glob(filepath.Join(tmpLogDir, "*.log"))
+			So(len(matches), ShouldEqual, 10)
+
+			expectedOutput := fmt.Sprintf(
+				"%s\n%s\n%s\n%s\n",
+				provider.WorkingDir(),
+				provider.Name(),
+				provider.upstreamURL,
+				logFile,
+			)
+
+			loggedContent, err := ioutil.ReadFile(filepath.Join(provider.LogDir(), "latest"))
+			So(err, ShouldBeNil)
+			So(string(loggedContent), ShouldEqual, expectedOutput)
+		})
+
+		Convey("If job failed simply", func() {
+			managerChan := make(chan jobMessage)
+			semaphore := make(chan empty, 1)
+			job := newMirrorJob(provider)
+
+			scriptContent := `#!/bin/bash
+echo $TUNASYNC_WORKING_DIR
+echo $TUNASYNC_MIRROR_NAME
+echo $TUNASYNC_UPSTREAM_URL
+echo $TUNASYNC_LOG_FILE
+sleep 5
+			`
+
+			err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
+			So(err, ShouldBeNil)
+
+			go job.Run(managerChan, semaphore)
+			job.ctrlChan <- jobStart
+			msg := <-managerChan
+			So(msg.status, ShouldEqual, PreSyncing)
+			msg = <-managerChan
+			So(msg.status, ShouldEqual, Syncing)
+			logFile := provider.LogFile()
+
+			time.Sleep(1 * time.Second)
+			job.ctrlChan <- jobStop
+
+			msg = <-managerChan
+			So(msg.status, ShouldEqual, Failed)
+
+			job.ctrlChan <- jobDisable
+			<-job.stopped
+
+			So(logFile, ShouldNotEqual, provider.LogFile())
+
+			expectedOutput := fmt.Sprintf(
+				"%s\n%s\n%s\n%s\n",
+				provider.WorkingDir(),
+				provider.Name(),
+				provider.upstreamURL,
+				logFile,
+			)
+
+			loggedContent, err := ioutil.ReadFile(filepath.Join(provider.LogDir(), "latest"))
+			So(err, ShouldBeNil)
+			So(string(loggedContent), ShouldEqual, expectedOutput)
+			loggedContent, err = ioutil.ReadFile(logFile + ".fail")
+			So(err, ShouldBeNil)
+			So(string(loggedContent), ShouldEqual, expectedOutput)
+		})
+
+	})
+}

+ 1 - 0
worker/provider.go

@@ -33,6 +33,7 @@ type mirrorProvider interface {
 	// job hooks
 	IsRunning() bool
 
+	AddHook(hook jobHook)
 	Hooks() []jobHook
 
 	Interval() time.Duration