Przeglądaj źródła

feature(manager): implement manager server, to be tested

walkerning 9 lat temu
rodzic
commit
bf31e168a2
6 zmienionych plików z 340 dodań i 24 usunięć
  1. 10 4
      internal/msg.go
  2. 5 0
      manager/config.go
  3. 56 6
      manager/db.go
  4. 198 14
      manager/server.go
  5. 67 0
      manager/server_test.go
  6. 4 0
      manager/status.go

+ 10 - 4
internal/msg.go

@@ -15,9 +15,10 @@ type StatusUpdateMsg struct {
 	ErrorMsg   string     `json:"error_msg"`
 }
 
-// A WorkerInfoMsg is
+// A WorkerInfoMsg is the information struct that describe
+// a worker, and sent from the manager to clients.
 type WorkerInfoMsg struct {
-	Name string `json:"name"`
+	ID string `json:"id"`
 }
 
 type CmdVerb uint8
@@ -30,11 +31,16 @@ const (
 	CmdPing            // ensure the goroutine is alive
 )
 
+// A WorkerCmd is the command message send from the
+// manager to a worker
 type WorkerCmd struct {
-	Cmd  CmdVerb  `json:"cmd"`
-	Args []string `json:"args"`
+	Cmd      CmdVerb  `json:"cmd"`
+	MirrorID string   `json:"mirror_id"`
+	Args     []string `json:"args"`
 }
 
+// A ClientCmd is the command message send from client
+// to the manager
 type ClientCmd struct {
 	Cmd      CmdVerb  `json:"cmd"`
 	MirrorID string   `json:"mirror_id"`

+ 5 - 0
manager/config.go

@@ -24,6 +24,7 @@ type ServerConfig struct {
 type FileConfig struct {
 	StatusFile string `toml:"status_file"`
 	DBFile     string `toml:"db_file"`
+	DBType     string `toml:"db_type"`
 	// used to connect to worker
 	CACert string `toml:"ca_cert"`
 }
@@ -36,6 +37,7 @@ func loadConfig(cfgFile string, c *cli.Context) (*Config, error) {
 	cfg.Debug = false
 	cfg.Files.StatusFile = "/var/lib/tunasync/tunasync.json"
 	cfg.Files.DBFile = "/var/lib/tunasync/tunasync.db"
+	cfg.Files.DBType = "bolt"
 
 	if cfgFile != "" {
 		if _, err := toml.DecodeFile(cfgFile, cfg); err != nil {
@@ -60,6 +62,9 @@ func loadConfig(cfgFile string, c *cli.Context) (*Config, error) {
 	if c.String("db-file") != "" {
 		cfg.Files.DBFile = c.String("db-file")
 	}
+	if c.String("db-type") != "" {
+		cfg.Files.DBFile = c.String("db-type")
+	}
 
 	return cfg, nil
 }

+ 56 - 6
manager/db.go

@@ -1,13 +1,35 @@
 package manager
 
-import "github.com/boltdb/bolt"
+import (
+	"fmt"
+	"github.com/boltdb/bolt"
+)
 
 type dbAdapter interface {
-	GetWorker(workerID string)
-	UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus)
-	GetMirrorStatus(workerID, mirrorID string)
-	GetMirrorStatusList(workerID string)
-	Close()
+	ListWorkers() ([]worker, error)
+	GetWorker(workerID string) (worker, error)
+	CreateWorker(w worker) (worker, error)
+	UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error)
+	GetMirrorStatus(workerID, mirrorID string) (mirrorStatus, error)
+	ListMirrorStatus(workerID string) ([]mirrorStatus, error)
+	ListAllMirrorStatus() ([]mirrorStatus, error)
+	Close() error
+}
+
+func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
+	if dbType == "bolt" {
+		innerDB, err := bolt.Open(dbFile, 0600, nil)
+		if err != nil {
+			return nil, err
+		}
+		db := boltAdapter{
+			db:     innerDB,
+			dbFile: dbFile,
+		}
+		return &db, nil
+	}
+	// unsupported db-type
+	return nil, fmt.Errorf("unsupported db-type: %s", dbType)
 }
 
 type boltAdapter struct {
@@ -15,6 +37,34 @@ type boltAdapter struct {
 	dbFile string
 }
 
+func (b *boltAdapter) ListWorkers() ([]worker, error) {
+	return []worker{}, nil
+}
+
+func (b *boltAdapter) GetWorker(workerID string) (worker, error) {
+	return worker{}, nil
+}
+
+func (b *boltAdapter) CreateWorker(w worker) (worker, error) {
+	return worker{}, nil
+}
+
+func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error) {
+	return mirrorStatus{}, nil
+}
+
+func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (mirrorStatus, error) {
+	return mirrorStatus{}, nil
+}
+
+func (b *boltAdapter) ListMirrorStatus(workerID string) ([]mirrorStatus, error) {
+	return []mirrorStatus{}, nil
+}
+
+func (b *boltAdapter) ListAllMirrorStatus() ([]mirrorStatus, error) {
+	return []mirrorStatus{}, nil
+}
+
 func (b *boltAdapter) Close() error {
 	if b.db != nil {
 		return b.db.Close()

+ 198 - 14
manager/server.go

@@ -1,42 +1,226 @@
 package manager
 
 import (
+	"fmt"
+	"github.com/gin-gonic/gin"
+	. "github.com/tuna/tunasync/internal"
 	"net/http"
+	"sync"
+	"time"
+)
 
-	"github.com/gin-gonic/gin"
+const (
+	maxQueuedCmdNum = 3
+	cmdPollTime     = 10 * time.Second
+)
+
+const (
+	_errorKey = "error"
+	_infoKey  = "message"
 )
 
 type worker struct {
 	// worker name
-	name string
-	// url to connect to worker
-	url string
+	id string
 	// session token
 	token string
 }
 
-func makeHTTPServer(debug bool) *gin.Engine {
+var (
+	workerChannelMu sync.RWMutex
+	workerChannels  = make(map[string]chan WorkerCmd)
+)
+
+type managerServer struct {
+	*gin.Engine
+	adapter dbAdapter
+}
+
+// listAllJobs repond with all jobs of specified workers
+func (s *managerServer) listAllJobs(c *gin.Context) {
+	mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
+	if err != nil {
+		err := fmt.Errorf("failed to list all mirror status: %s",
+			err.Error(),
+		)
+		c.Error(err)
+		s.returnErrJSON(c, http.StatusInternalServerError, err)
+		return
+	}
+	c.JSON(http.StatusOK, mirrorStatusList)
+}
+
+// listWrokers respond with informations of all the workers
+func (s *managerServer) listWorkers(c *gin.Context) {
+	var workerInfos []WorkerInfoMsg
+	workers, err := s.adapter.ListWorkers()
+	if err != nil {
+		err := fmt.Errorf("failed to list workers: %s",
+			err.Error(),
+		)
+		c.Error(err)
+		s.returnErrJSON(c, http.StatusInternalServerError, err)
+		return
+	}
+	for _, w := range workers {
+		workerInfos = append(workerInfos,
+			WorkerInfoMsg{w.id})
+	}
+	c.JSON(http.StatusOK, workerInfos)
+}
+
+// registerWorker register an newly-online worker
+func (s *managerServer) registerWorker(c *gin.Context) {
+	var _worker worker
+	c.BindJSON(&_worker)
+	newWorker, err := s.adapter.CreateWorker(_worker)
+	if err != nil {
+		err := fmt.Errorf("failed to register worker: %s",
+			err.Error(),
+		)
+		c.Error(err)
+		s.returnErrJSON(c, http.StatusInternalServerError, err)
+		return
+	}
+	// create workerCmd channel for this worker
+	workerChannelMu.Lock()
+	defer workerChannelMu.Unlock()
+	workerChannels[_worker.id] = make(chan WorkerCmd, maxQueuedCmdNum)
+	c.JSON(http.StatusOK, newWorker)
+}
+
+// listJobsOfWorker respond with all the jobs of the specified worker
+func (s *managerServer) listJobsOfWorker(c *gin.Context) {
+	workerID := c.Param("id")
+	mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
+	if err != nil {
+		err := fmt.Errorf("failed to list jobs of worker %s: %s",
+			workerID, err.Error(),
+		)
+		c.Error(err)
+		s.returnErrJSON(c, http.StatusInternalServerError, err)
+		return
+	}
+	c.JSON(http.StatusOK, mirrorStatusList)
+}
+
+func (s *managerServer) returnErrJSON(c *gin.Context, code int, err error) {
+	c.JSON(code, gin.H{
+		_errorKey: err.Error(),
+	})
+}
+
+func (s *managerServer) updateJobOfWorker(c *gin.Context) {
+	workerID := c.Param("id")
+	var status mirrorStatus
+	c.BindJSON(&status)
+	mirrorName := status.Name
+	newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
+	if err != nil {
+		err := fmt.Errorf("failed to update job %s of worker %s: %s",
+			mirrorName, workerID, err.Error(),
+		)
+		c.Error(err)
+		s.returnErrJSON(c, http.StatusInternalServerError, err)
+		return
+	}
+	c.JSON(http.StatusOK, newStatus)
+}
+
+func (s *managerServer) handleClientCmd(c *gin.Context) {
+	workerChannelMu.RLock()
+	defer workerChannelMu.RUnlock()
+	var clientCmd ClientCmd
+	c.BindJSON(&clientCmd)
+	// TODO: decide which worker should do this mirror when WorkerID is null string
+	workerID := clientCmd.WorkerID
+	if workerID == "" {
+		// TODO: decide which worker should do this mirror when WorkerID is null string
+		logger.Error("handleClientCmd case workerID == \" \" not implemented yet")
+		c.AbortWithStatus(http.StatusInternalServerError)
+		return
+	}
+
+	workerChannel, ok := workerChannels[workerID]
+	if !ok {
+		err := fmt.Errorf("worker %s is not registered yet", workerID)
+		s.returnErrJSON(c, http.StatusBadRequest, err)
+		return
+	}
+	// parse client cmd into worker cmd
+	workerCmd := WorkerCmd{
+		Cmd:      clientCmd.Cmd,
+		MirrorID: clientCmd.MirrorID,
+		Args:     clientCmd.Args,
+	}
+	select {
+	case workerChannel <- workerCmd:
+		// successfully insert command to channel
+		c.JSON(http.StatusOK, struct{}{})
+	default:
+		// pending commands for that worker exceed
+		// the maxQueuedCmdNum threshold
+		err := fmt.Errorf("pending commands for worker %s exceed"+
+			"the %d threshold, the command is dropped",
+			workerID, maxQueuedCmdNum)
+		c.Error(err)
+		s.returnErrJSON(c, http.StatusServiceUnavailable, err)
+		return
+	}
+}
+
+func (s *managerServer) getCmdOfWorker(c *gin.Context) {
+	workerID := c.Param("id")
+	workerChannelMu.RLock()
+	defer workerChannelMu.RUnlock()
+
+	workerChannel := workerChannels[workerID]
+	for {
+		select {
+		case _ = <-workerChannel:
+			// TODO: push new command to worker client
+			continue
+		case <-time.After(cmdPollTime):
+			// time limit exceeded, close the connection
+			break
+		}
+	}
+}
+
+func (s *managerServer) setDBAdapter(adapter dbAdapter) {
+	s.adapter = adapter
+}
+
+func makeHTTPServer(debug bool) *managerServer {
+	// create gin engine
 	if !debug {
 		gin.SetMode(gin.ReleaseMode)
 	}
-	r := gin.Default()
-	r.GET("/ping", func(c *gin.Context) {
+	s := &managerServer{
+		gin.Default(),
+		nil,
+	}
+	s.GET("/ping", func(c *gin.Context) {
 		c.JSON(http.StatusOK, gin.H{"msg": "pong"})
 	})
 	// list jobs, status page
-	r.GET("/jobs", func(c *gin.Context) {})
+	s.GET("/jobs", s.listAllJobs)
+
+	// list workers
+	s.GET("/workers", s.listWorkers)
 	// worker online
-	r.POST("/workers/:name", func(c *gin.Context) {})
+	s.POST("/workers/:id", s.registerWorker)
+
 	// get job list
-	r.GET("/workers/:name/jobs", func(c *gin.Context) {})
+	s.GET("/workers/:id/jobs", s.listJobsOfWorker)
 	// post job status
-	r.POST("/workers/:name/jobs/:job", func(c *gin.Context) {})
+	s.POST("/workers/:id/jobs/:job", s.updateJobOfWorker)
 
 	// worker command polling
-	r.GET("/workers/:name/cmd_stream", func(c *gin.Context) {})
+	s.GET("/workers/:id/cmd_stream", s.getCmdOfWorker)
 
 	// for tunasynctl to post commands
-	r.POST("/cmd/", func(c *gin.Context) {})
+	s.POST("/cmd/", s.handleClientCmd)
 
-	return r
+	return s
 }

+ 67 - 0
manager/server_test.go

@@ -6,12 +6,79 @@ import (
 	"io/ioutil"
 	"math/rand"
 	"net/http"
+	"strings"
 	"testing"
 	"time"
 
 	. "github.com/smartystreets/goconvey/convey"
 )
 
+type mockDBAdapter struct {
+	workerStore map[string]worker
+	statusStore map[string]mirrorStatus
+}
+
+func (b *mockDBAdapter) ListWorkers() ([]worker, error) {
+	workers := make([]worker, len(b.workerStore))
+	idx := 0
+	for _, w := range b.workerStore {
+		workers[idx] = w
+		idx++
+	}
+	return workers, nil
+}
+
+func (b *mockDBAdapter) GetWorker(workerID string) (worker, error) {
+	w, ok := b.workerStore[workerID]
+	if !ok {
+		return worker{}, fmt.Errorf("inexist workerId")
+	}
+	return w, nil
+}
+
+func (b *mockDBAdapter) CreateWorker(w worker) (worker, error) {
+	_, ok := b.workerStore[w.id]
+	if ok {
+		return worker{}, fmt.Errorf("duplicate worker name")
+	}
+	b.workerStore[w.id] = w
+	return w, nil
+}
+
+func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (mirrorStatus, error) {
+	// TODO: need to check worker exist first
+	id := workerID + "/" + mirrorID
+	status, ok := b.statusStore[id]
+	if !ok {
+		return mirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
+	}
+	return status, nil
+}
+
+func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error) {
+	id := workerID + "/" + mirrorID
+	b.statusStore[id] = status
+	return status, nil
+}
+
+func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]mirrorStatus, error) {
+	var mirrorStatusList []mirrorStatus
+	for k, v := range b.statusStore {
+		if wID := strings.Split(k, "/")[1]; wID == workerID {
+			mirrorStatusList = append(mirrorStatusList, v)
+		}
+	}
+	return mirrorStatusList, nil
+}
+
+func (b *mockDBAdapter) ListAllMirrorStatus() ([]mirrorStatus, error) {
+	var mirrorStatusList []mirrorStatus
+	for _, v := range b.statusStore {
+		mirrorStatusList = append(mirrorStatusList, v)
+	}
+	return mirrorStatusList, nil
+}
+
 func TestHTTPServer(t *testing.T) {
 	Convey("HTTP server should work", t, func() {
 		s := makeHTTPServer(false)

+ 4 - 0
manager/status.go

@@ -12,6 +12,8 @@ import (
 
 type mirrorStatus struct {
 	Name       string
+	Worker     string
+	IsMaster   bool
 	Status     SyncStatus
 	LastUpdate time.Time
 	Upstream   string
@@ -21,6 +23,8 @@ type mirrorStatus struct {
 func (s mirrorStatus) MarshalJSON() ([]byte, error) {
 	m := map[string]interface{}{
 		"name":           s.Name,
+		"worker":         s.Worker,
+		"is_master":      s.IsMaster,
 		"status":         s.Status,
 		"last_update":    s.LastUpdate.Format("2006-01-02 15:04:05"),
 		"last_update_ts": fmt.Sprintf("%d", s.LastUpdate.Unix()),