Browse Source

feature(worker): implemented Worker object, worker side code is almost done

bigeagle 9 năm trước cách đây
mục cha
commit
ce3471e30d

+ 4 - 2
internal/msg.go

@@ -4,7 +4,7 @@ import "time"
 
 // A StatusUpdateMsg represents a msg when
 // a worker has done syncing
-type StatusUpdateMsg struct {
+type MirrorStatus struct {
 	Name       string     `json:"name"`
 	Worker     string     `json:"worker"`
 	IsMaster   bool       `json:"is_master"`
@@ -19,7 +19,9 @@ type StatusUpdateMsg struct {
 // a worker, and sent from the manager to clients.
 type WorkerInfoMsg struct {
 	ID         string    `json:"id"`
-	LastOnline time.Time `json:"last_online"`
+	URL        string    `json:"url"`         // worker url
+	Token      string    `json:"token"`       // session token
+	LastOnline time.Time `json:"last_online"` // last seen
 }
 
 type CmdVerb uint8

+ 79 - 0
internal/util.go

@@ -0,0 +1,79 @@
+package internal
+
+import (
+	"bytes"
+	"crypto/tls"
+	"crypto/x509"
+	"encoding/json"
+	"errors"
+	"io/ioutil"
+	"net/http"
+)
+
+// GetTLSConfig generate tls.Config from CAFile
+func GetTLSConfig(CAFile string) (*tls.Config, error) {
+	caCert, err := ioutil.ReadFile(CAFile)
+	if err != nil {
+		return nil, err
+	}
+	caCertPool := x509.NewCertPool()
+	if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
+		return nil, errors.New("Failed to add CA to pool")
+	}
+
+	tlsConfig := &tls.Config{
+		RootCAs: caCertPool,
+	}
+	tlsConfig.BuildNameToCertificate()
+	return tlsConfig, nil
+}
+
+// PostJSON posts json object to url
+func PostJSON(url string, obj interface{}, tlsConfig *tls.Config) (*http.Response, error) {
+	var client *http.Client
+	if tlsConfig == nil {
+		client = &http.Client{}
+	} else {
+		tr := &http.Transport{
+			TLSClientConfig: tlsConfig,
+		}
+		client = &http.Client{
+			Transport: tr,
+		}
+	}
+
+	b := new(bytes.Buffer)
+	if err := json.NewEncoder(b).Encode(obj); err != nil {
+		return nil, err
+	}
+	return client.Post(url, "application/json; charset=utf-8", b)
+}
+
+// GetJSON gets a json response from url
+func GetJSON(url string, obj interface{}, tlsConfig *tls.Config) (*http.Response, error) {
+	var client *http.Client
+	if tlsConfig == nil {
+		client = &http.Client{}
+	} else {
+		tr := &http.Transport{
+			TLSClientConfig: tlsConfig,
+		}
+		client = &http.Client{
+			Transport: tr,
+		}
+	}
+
+	resp, err := client.Get(url)
+	if err != nil {
+		return resp, err
+	}
+	if resp.StatusCode != http.StatusOK {
+		return resp, errors.New("HTTP status code is not 200")
+	}
+	defer resp.Body.Close()
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return resp, err
+	}
+	return resp, json.Unmarshal(body, obj)
+}

+ 4 - 1
manager/server.go

@@ -47,7 +47,10 @@ func (s *managerServer) listWorkers(c *gin.Context) {
 	}
 	for _, w := range workers {
 		workerInfos = append(workerInfos,
-			WorkerInfoMsg{w.ID, w.LastOnline})
+			WorkerInfoMsg{
+				ID:         w.ID,
+				LastOnline: w.LastOnline,
+			})
 	}
 	c.JSON(http.StatusOK, workerInfos)
 }

+ 4 - 0
worker/cmd_provider.go

@@ -44,6 +44,10 @@ func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
 	return provider, nil
 }
 
+func (p *cmdProvider) Upstream() string {
+	return p.upstreamURL
+}
+
 func (p *cmdProvider) Run() error {
 	if err := p.Start(); err != nil {
 		return err

+ 9 - 4
worker/config_test.go

@@ -110,16 +110,21 @@ exclude_file = "/etc/tunasync.d/fedora-exclude.txt"
 		cfg, err := loadConfig(tmpfile.Name())
 		So(err, ShouldBeNil)
 
-		providers := initProviders(cfg)
+		w := &Worker{
+			cfg:       cfg,
+			providers: make(map[string]mirrorProvider),
+		}
 
-		p := providers[0]
+		w.initProviders()
+
+		p := w.providers["AOSP"]
 		So(p.Name(), ShouldEqual, "AOSP")
 		So(p.LogDir(), ShouldEqual, "/var/log/tunasync/AOSP")
 		So(p.LogFile(), ShouldEqual, "/var/log/tunasync/AOSP/latest.log")
 		_, ok := p.(*cmdProvider)
 		So(ok, ShouldBeTrue)
 
-		p = providers[1]
+		p = w.providers["debian"]
 		So(p.Name(), ShouldEqual, "debian")
 		So(p.LogDir(), ShouldEqual, "/var/log/tunasync/debian")
 		So(p.LogFile(), ShouldEqual, "/var/log/tunasync/debian/latest.log")
@@ -128,7 +133,7 @@ exclude_file = "/etc/tunasync.d/fedora-exclude.txt"
 		So(r2p.stage1Profile, ShouldEqual, "debian")
 		So(r2p.WorkingDir(), ShouldEqual, "/data/mirrors/debian")
 
-		p = providers[2]
+		p = w.providers["fedora"]
 		So(p.Name(), ShouldEqual, "fedora")
 		So(p.LogDir(), ShouldEqual, "/var/log/tunasync/fedora")
 		So(p.LogFile(), ShouldEqual, "/var/log/tunasync/fedora/latest.log")

+ 7 - 5
worker/job.go

@@ -28,7 +28,7 @@ type jobMessage struct {
 type mirrorJob struct {
 	provider mirrorProvider
 	ctrlChan chan ctrlAction
-	stopped  chan empty
+	disabled chan empty
 	enabled  bool
 }
 
@@ -44,12 +44,12 @@ func (m *mirrorJob) Name() string {
 	return m.provider.Name()
 }
 
-func (m *mirrorJob) Stopped() bool {
+func (m *mirrorJob) Disabled() bool {
 	if !m.enabled {
 		return true
 	}
 	select {
-	case <-m.stopped:
+	case <-m.disabled:
 		return true
 	default:
 		return false
@@ -65,8 +65,8 @@ func (m *mirrorJob) Stopped() bool {
 // TODO: message struct for managerChan
 func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {
 
-	m.stopped = make(chan empty)
-	defer close(m.stopped)
+	m.disabled = make(chan empty)
+	defer close(m.disabled)
 
 	provider := m.provider
 
@@ -210,6 +210,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 					close(kill)
 					<-jobDone
 				case jobDisable:
+					m.enabled = false
 					close(kill)
 					<-jobDone
 					return nil
@@ -234,6 +235,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
 		case jobStop:
 			m.enabled = false
 		case jobDisable:
+			m.enabled = false
 			return nil
 		case jobRestart:
 			m.enabled = true

+ 3 - 3
worker/job_test.go

@@ -105,7 +105,7 @@ func TestMirrorJob(t *testing.T) {
 				select {
 				case <-managerChan:
 					So(0, ShouldEqual, 1) // made this fail
-				case <-job.stopped:
+				case <-job.disabled:
 					So(0, ShouldEqual, 0)
 				}
 			})
@@ -145,7 +145,7 @@ echo $TUNASYNC_WORKING_DIR
 				So(err, ShouldBeNil)
 				So(string(loggedContent), ShouldEqual, exceptedOutput)
 				job.ctrlChan <- jobDisable
-				<-job.stopped
+				<-job.disabled
 			})
 
 			Convey("If we don't kill it", func(ctx C) {
@@ -168,7 +168,7 @@ echo $TUNASYNC_WORKING_DIR
 				So(err, ShouldBeNil)
 				So(string(loggedContent), ShouldEqual, exceptedOutput)
 				job.ctrlChan <- jobDisable
-				<-job.stopped
+				<-job.disabled
 			})
 		})
 

+ 1 - 1
worker/loglimit_test.go

@@ -122,7 +122,7 @@ sleep 5
 			So(msg.status, ShouldEqual, Failed)
 
 			job.ctrlChan <- jobDisable
-			<-job.stopped
+			<-job.disabled
 
 			So(logFile, ShouldNotEqual, provider.LogFile())
 

+ 0 - 108
worker/main.go

@@ -1,108 +0,0 @@
-package worker
-
-import (
-	"bytes"
-	"errors"
-	"html/template"
-	"path/filepath"
-	"time"
-)
-
-// toplevel module for workers
-
-func initProviders(c *Config) []mirrorProvider {
-
-	formatLogDir := func(logDir string, m mirrorConfig) string {
-		tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir)
-		if err != nil {
-			panic(err)
-		}
-		var formatedLogDir bytes.Buffer
-		tmpl.Execute(&formatedLogDir, m)
-		return formatedLogDir.String()
-	}
-
-	providers := []mirrorProvider{}
-
-	for _, mirror := range c.Mirrors {
-		logDir := mirror.LogDir
-		mirrorDir := mirror.MirrorDir
-		if logDir == "" {
-			logDir = c.Global.LogDir
-		}
-		if mirrorDir == "" {
-			mirrorDir = c.Global.MirrorDir
-		}
-		logDir = formatLogDir(logDir, mirror)
-		switch mirror.Provider {
-		case ProvCommand:
-			pc := cmdConfig{
-				name:        mirror.Name,
-				upstreamURL: mirror.Upstream,
-				command:     mirror.Command,
-				workingDir:  filepath.Join(mirrorDir, mirror.Name),
-				logDir:      logDir,
-				logFile:     filepath.Join(logDir, "latest.log"),
-				interval:    time.Duration(mirror.Interval) * time.Minute,
-				env:         mirror.Env,
-			}
-			p, err := newCmdProvider(pc)
-			if err != nil {
-				panic(err)
-			}
-			providers = append(providers, p)
-		case ProvRsync:
-			rc := rsyncConfig{
-				name:        mirror.Name,
-				upstreamURL: mirror.Upstream,
-				password:    mirror.Password,
-				excludeFile: mirror.ExcludeFile,
-				workingDir:  filepath.Join(mirrorDir, mirror.Name),
-				logDir:      logDir,
-				logFile:     filepath.Join(logDir, "latest.log"),
-				useIPv6:     mirror.UseIPv6,
-				interval:    time.Duration(mirror.Interval) * time.Minute,
-			}
-			p, err := newRsyncProvider(rc)
-			if err != nil {
-				panic(err)
-			}
-			providers = append(providers, p)
-		case ProvTwoStageRsync:
-			rc := twoStageRsyncConfig{
-				name:          mirror.Name,
-				stage1Profile: mirror.Stage1Profile,
-				upstreamURL:   mirror.Upstream,
-				password:      mirror.Password,
-				excludeFile:   mirror.ExcludeFile,
-				workingDir:    filepath.Join(mirrorDir, mirror.Name),
-				logDir:        logDir,
-				logFile:       filepath.Join(logDir, "latest.log"),
-				useIPv6:       mirror.UseIPv6,
-				interval:      time.Duration(mirror.Interval) * time.Minute,
-			}
-			p, err := newTwoStageRsyncProvider(rc)
-			if err != nil {
-				panic(err)
-			}
-			providers = append(providers, p)
-		default:
-			panic(errors.New("Invalid mirror provider"))
-
-		}
-
-	}
-	return providers
-}
-
-func main() {
-
-	for {
-		// if time.Now().After() {
-		//
-		// }
-
-		time.Sleep(1 * time.Second)
-	}
-
-}

+ 1 - 0
worker/provider.go

@@ -21,6 +21,7 @@ const (
 type mirrorProvider interface {
 	// name
 	Name() string
+	Upstream() string
 
 	// run mirror job in background
 	Run() error

+ 4 - 0
worker/rsync_provider.go

@@ -63,6 +63,10 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
 	return provider, nil
 }
 
+func (p *rsyncProvider) Upstream() string {
+	return p.upstreamURL
+}
+
 func (p *rsyncProvider) Run() error {
 	if err := p.Start(); err != nil {
 		return err

+ 4 - 0
worker/two_stage_rsync_provider.go

@@ -70,6 +70,10 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er
 	return provider, nil
 }
 
+func (p *twoStageRsyncProvider) Upstream() string {
+	return p.upstreamURL
+}
+
 func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
 	var options []string
 	if stage == 1 {

+ 342 - 0
worker/worker.go

@@ -0,0 +1,342 @@
+package worker
+
+import (
+	"bytes"
+	"crypto/tls"
+	"errors"
+	"fmt"
+	"html/template"
+	"net/http"
+	"path/filepath"
+	"time"
+
+	"github.com/gin-gonic/gin"
+	. "github.com/tuna/tunasync/internal"
+)
+
+var tunasyncWorker *Worker
+
+// A Worker is a instance of tunasync worker
+type Worker struct {
+	cfg       *Config
+	providers map[string]mirrorProvider
+	jobs      map[string]*mirrorJob
+
+	managerChan chan jobMessage
+	semaphore   chan empty
+
+	schedule   *scheduleQueue
+	httpServer *gin.Engine
+	tlsConfig  *tls.Config
+
+	mirrorStatus map[string]SyncStatus
+}
+
+// GetTUNASyncWorker returns a singalton worker
+func GetTUNASyncWorker(cfg *Config) *Worker {
+	if tunasyncWorker != nil {
+		return tunasyncWorker
+	}
+
+	w := &Worker{
+		cfg:       cfg,
+		providers: make(map[string]mirrorProvider),
+		jobs:      make(map[string]*mirrorJob),
+
+		managerChan: make(chan jobMessage, 32),
+		semaphore:   make(chan empty, cfg.Global.Concurrent),
+
+		schedule:     newScheduleQueue(),
+		mirrorStatus: make(map[string]SyncStatus),
+	}
+	w.initJobs()
+	w.makeHTTPServer()
+	tunasyncWorker = w
+	return w
+}
+
+func (w *Worker) initProviders() {
+	c := w.cfg
+
+	formatLogDir := func(logDir string, m mirrorConfig) string {
+		tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir)
+		if err != nil {
+			panic(err)
+		}
+		var formatedLogDir bytes.Buffer
+		tmpl.Execute(&formatedLogDir, m)
+		return formatedLogDir.String()
+	}
+
+	for _, mirror := range c.Mirrors {
+		logDir := mirror.LogDir
+		mirrorDir := mirror.MirrorDir
+		if logDir == "" {
+			logDir = c.Global.LogDir
+		}
+		if mirrorDir == "" {
+			mirrorDir = c.Global.MirrorDir
+		}
+		logDir = formatLogDir(logDir, mirror)
+
+		var provider mirrorProvider
+
+		switch mirror.Provider {
+		case ProvCommand:
+			pc := cmdConfig{
+				name:        mirror.Name,
+				upstreamURL: mirror.Upstream,
+				command:     mirror.Command,
+				workingDir:  filepath.Join(mirrorDir, mirror.Name),
+				logDir:      logDir,
+				logFile:     filepath.Join(logDir, "latest.log"),
+				interval:    time.Duration(mirror.Interval) * time.Minute,
+				env:         mirror.Env,
+			}
+			p, err := newCmdProvider(pc)
+			if err != nil {
+				panic(err)
+			}
+			provider = p
+		case ProvRsync:
+			rc := rsyncConfig{
+				name:        mirror.Name,
+				upstreamURL: mirror.Upstream,
+				password:    mirror.Password,
+				excludeFile: mirror.ExcludeFile,
+				workingDir:  filepath.Join(mirrorDir, mirror.Name),
+				logDir:      logDir,
+				logFile:     filepath.Join(logDir, "latest.log"),
+				useIPv6:     mirror.UseIPv6,
+				interval:    time.Duration(mirror.Interval) * time.Minute,
+			}
+			p, err := newRsyncProvider(rc)
+			if err != nil {
+				panic(err)
+			}
+			provider = p
+		case ProvTwoStageRsync:
+			rc := twoStageRsyncConfig{
+				name:          mirror.Name,
+				stage1Profile: mirror.Stage1Profile,
+				upstreamURL:   mirror.Upstream,
+				password:      mirror.Password,
+				excludeFile:   mirror.ExcludeFile,
+				workingDir:    filepath.Join(mirrorDir, mirror.Name),
+				logDir:        logDir,
+				logFile:       filepath.Join(logDir, "latest.log"),
+				useIPv6:       mirror.UseIPv6,
+				interval:      time.Duration(mirror.Interval) * time.Minute,
+			}
+			p, err := newTwoStageRsyncProvider(rc)
+			if err != nil {
+				panic(err)
+			}
+			provider = p
+		default:
+			panic(errors.New("Invalid mirror provider"))
+
+		}
+
+		provider.AddHook(newLogLimiter(provider))
+		w.providers[provider.Name()] = provider
+
+	}
+}
+
+func (w *Worker) initJobs() {
+	w.initProviders()
+
+	for name, provider := range w.providers {
+		w.jobs[name] = newMirrorJob(provider)
+		go w.jobs[name].Run(w.managerChan, w.semaphore)
+		w.mirrorStatus[name] = Paused
+	}
+}
+
+// Ctrl server receives commands from the manager
+func (w *Worker) makeHTTPServer() {
+	s := gin.New()
+	s.Use(gin.Recovery())
+
+	s.POST("/", func(c *gin.Context) {
+		var cmd WorkerCmd
+
+		if err := c.BindJSON(&cmd); err != nil {
+			c.JSON(http.StatusBadRequest, gin.H{"msg": "Invalid request"})
+			return
+		}
+		job, ok := w.jobs[cmd.MirrorID]
+		if !ok {
+			c.JSON(http.StatusNotFound, gin.H{"msg": fmt.Sprintf("Mirror ``%s'' not found", cmd.MirrorID)})
+			return
+		}
+		// if job disabled, start them first
+		switch cmd.Cmd {
+		case CmdStart, CmdRestart:
+			if job.Disabled() {
+				go job.Run(w.managerChan, w.semaphore)
+			}
+		}
+		switch cmd.Cmd {
+		case CmdStart:
+			job.ctrlChan <- jobStart
+		case CmdStop:
+			job.ctrlChan <- jobStop
+		case CmdRestart:
+			job.ctrlChan <- jobRestart
+		case CmdDisable:
+			w.schedule.Remove(job.Name())
+			job.ctrlChan <- jobDisable
+			<-job.disabled
+		case CmdPing:
+			job.ctrlChan <- jobStart
+		default:
+			c.JSON(http.StatusNotAcceptable, gin.H{"msg": "Invalid Command"})
+			return
+		}
+
+		c.JSON(http.StatusOK, gin.H{"msg": "OK"})
+	})
+
+	w.httpServer = s
+}
+
+func (w *Worker) runHTTPServer() {
+	addr := fmt.Sprintf("%s:%d", w.cfg.Server.Addr, w.cfg.Server.Port)
+
+	if w.cfg.Server.SSLCert == "" && w.cfg.Server.SSLKey == "" {
+		if err := w.httpServer.Run(addr); err != nil {
+			panic(err)
+		}
+	} else {
+		if err := w.httpServer.RunTLS(addr, w.cfg.Server.SSLCert, w.cfg.Server.SSLKey); err != nil {
+			panic(err)
+		}
+	}
+}
+
+// Run runs worker forever
+func (w *Worker) Run() {
+	w.registorWorker()
+	go w.runHTTPServer()
+	w.runSchedule()
+}
+
+func (w *Worker) runSchedule() {
+	mirrorList := w.fetchJobStatus()
+	unset := make(map[string]bool)
+	for name := range w.jobs {
+		unset[name] = true
+	}
+	for _, m := range mirrorList {
+		if job, ok := w.jobs[m.Name]; ok {
+			stime := m.LastUpdate.Add(job.provider.Interval())
+			w.schedule.AddJob(stime, job)
+			delete(unset, m.Name)
+		}
+	}
+	for name := range unset {
+		job := w.jobs[name]
+		w.schedule.AddJob(time.Now(), job)
+	}
+
+	for {
+		select {
+		case jobMsg := <-w.managerChan:
+			// got status update from job
+			w.updateStatus(jobMsg)
+			status := w.mirrorStatus[jobMsg.name]
+			if status == Disabled || status == Paused {
+				continue
+			}
+			w.mirrorStatus[jobMsg.name] = jobMsg.status
+			switch jobMsg.status {
+			case Success, Failed:
+				job := w.jobs[jobMsg.name]
+				w.schedule.AddJob(
+					time.Now().Add(job.provider.Interval()),
+					job,
+				)
+			}
+
+		case <-time.Tick(10 * time.Second):
+			if job := w.schedule.Pop(); job != nil {
+				job.ctrlChan <- jobStart
+			}
+		}
+
+	}
+
+}
+
+// Name returns worker name
+func (w *Worker) Name() string {
+	return w.cfg.Global.Name
+}
+
+// URL returns the url to http server of the worker
+func (w *Worker) URL() string {
+	proto := "https"
+	if w.cfg.Server.SSLCert == "" && w.cfg.Server.SSLKey == "" {
+		proto = "http"
+	}
+
+	return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port)
+}
+
+func (w *Worker) registorWorker() {
+	url := fmt.Sprintf(
+		"%s/workers",
+		w.cfg.Manager.APIBase,
+	)
+
+	msg := WorkerInfoMsg{
+		ID:  w.Name(),
+		URL: w.URL(),
+	}
+
+	if _, err := PostJSON(url, msg, w.tlsConfig); err != nil {
+		logger.Error("Failed to register worker")
+	}
+}
+
+func (w *Worker) updateStatus(jobMsg jobMessage) {
+	url := fmt.Sprintf(
+		"%s/%s/jobs/%s",
+		w.cfg.Manager.APIBase,
+		w.Name(),
+		jobMsg.name,
+	)
+	p := w.providers[jobMsg.name]
+	smsg := MirrorStatus{
+		Name:       jobMsg.name,
+		Worker:     w.cfg.Global.Name,
+		IsMaster:   true,
+		Status:     jobMsg.status,
+		LastUpdate: time.Now(),
+		Upstream:   p.Upstream(),
+		Size:       "unknown",
+		ErrorMsg:   jobMsg.msg,
+	}
+
+	if _, err := PostJSON(url, smsg, w.tlsConfig); err != nil {
+		logger.Error("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error())
+	}
+}
+
+func (w *Worker) fetchJobStatus() []MirrorStatus {
+	var mirrorList []MirrorStatus
+
+	url := fmt.Sprintf(
+		"%s/%s/jobs",
+		w.cfg.Manager.APIBase,
+		w.Name(),
+	)
+
+	if _, err := GetJSON(url, &mirrorList, w.tlsConfig); err != nil {
+		logger.Error("Failed to fetch job status: %s", err.Error())
+	}
+
+	return mirrorList
+}