소스 검색

Merge pull request #134 from tuna/worker-last-online-register

Worker last online and last register
Yuxiang Zhang 5 년 전
부모
커밋
b578237df8
7개의 변경된 파일35개의 추가작업 그리고 8개의 파일을 삭제
  1. 5 4
      internal/msg.go
  2. 10 0
      manager/db.go
  3. 1 0
      manager/db_test.go
  4. 7 2
      manager/server.go
  5. 9 0
      manager/server_test.go
  6. 2 2
      worker/worker.go
  7. 1 0
      worker/worker_test.go

+ 5 - 4
internal/msg.go

@@ -24,10 +24,11 @@ type MirrorStatus struct {
 // A WorkerStatus is the information struct that describe
 // a worker, and sent from the manager to clients.
 type WorkerStatus struct {
-	ID         string    `json:"id"`
-	URL        string    `json:"url"`         // worker url
-	Token      string    `json:"token"`       // session token
-	LastOnline time.Time `json:"last_online"` // last seen
+	ID           string    `json:"id"`
+	URL          string    `json:"url"`           // worker url
+	Token        string    `json:"token"`         // session token
+	LastOnline   time.Time `json:"last_online"`   // last seen
+	LastRegister time.Time `json:"last_register"` // last register time
 }
 
 type MirrorSchedules struct {

+ 10 - 0
manager/db.go

@@ -17,6 +17,7 @@ type dbAdapter interface {
 	GetWorker(workerID string) (WorkerStatus, error)
 	DeleteWorker(workerID string) error
 	CreateWorker(w WorkerStatus) (WorkerStatus, error)
+	RefreshWorker(workerID string) (WorkerStatus, error)
 	UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
 	GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
 	ListMirrorStatus(workerID string) ([]MirrorStatus, error)
@@ -125,6 +126,15 @@ func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
 	return w, err
 }
 
+func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
+	w, err = b.GetWorker(workerID)
+	if err == nil {
+		w.LastOnline = time.Now()
+		w, err = b.CreateWorker(w)
+	}
+	return w, err
+}
+
 func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
 	id := mirrorID + "/" + workerID
 	err := b.db.Update(func(tx *bolt.Tx) error {

+ 1 - 0
manager/db_test.go

@@ -35,6 +35,7 @@ func TestBoltAdapter(t *testing.T) {
 					ID:         id,
 					Token:      "token_" + id,
 					LastOnline: time.Now(),
+					LastRegister: time.Now(),
 				}
 				w, err = boltDB.CreateWorker(w)
 				So(err, ShouldBeNil)

+ 7 - 2
manager/server.go

@@ -203,8 +203,9 @@ func (s *Manager) listWorkers(c *gin.Context) {
 	for _, w := range workers {
 		workerInfos = append(workerInfos,
 			WorkerStatus{
-				ID:         w.ID,
-				LastOnline: w.LastOnline,
+				ID:           w.ID,
+				LastOnline:   w.LastOnline,
+				LastRegister: w.LastRegister,
 			})
 	}
 	c.JSON(http.StatusOK, workerInfos)
@@ -215,6 +216,7 @@ func (s *Manager) registerWorker(c *gin.Context) {
 	var _worker WorkerStatus
 	c.BindJSON(&_worker)
 	_worker.LastOnline = time.Now()
+	_worker.LastRegister = time.Now()
 	newWorker, err := s.adapter.CreateWorker(_worker)
 	if err != nil {
 		err := fmt.Errorf("failed to register worker: %s",
@@ -268,6 +270,7 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
 		}
 
 		s.rwmu.RLock()
+		s.adapter.RefreshWorker(workerID)
 		curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
 		s.rwmu.RUnlock()
 		if err != nil {
@@ -312,6 +315,7 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
 	}
 
 	s.rwmu.RLock()
+	s.adapter.RefreshWorker(workerID)
 	curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
 	s.rwmu.RUnlock()
 
@@ -374,6 +378,7 @@ func (s *Manager) updateMirrorSize(c *gin.Context) {
 
 	mirrorName := msg.Name
 	s.rwmu.RLock()
+	s.adapter.RefreshWorker(workerID)
 	status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
 	s.rwmu.RUnlock()
 	if err != nil {

+ 9 - 0
manager/server_test.go

@@ -462,6 +462,15 @@ func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
 	return w, nil
 }
 
+func (b *mockDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
+	w, err = b.GetWorker(workerID)
+	if err == nil {
+		w.LastOnline = time.Now()
+		w, err = b.CreateWorker(w)
+	}
+	return w, err
+}
+
 func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
 	id := mirrorID + "/" + workerID
 	status, ok := b.statusStore[id]

+ 2 - 2
worker/worker.go

@@ -61,7 +61,7 @@ func NewTUNASyncWorker(cfg *Config) *Worker {
 
 // Run runs worker forever
 func (w *Worker) Run() {
-	w.registorWorker()
+	w.registerWorker()
 	go w.runHTTPServer()
 	w.runSchedule()
 }
@@ -393,7 +393,7 @@ func (w *Worker) URL() string {
 	return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port)
 }
 
-func (w *Worker) registorWorker() {
+func (w *Worker) registerWorker() {
 	msg := WorkerStatus{
 		ID:  w.Name(),
 		URL: w.URL(),

+ 1 - 0
worker/worker_test.go

@@ -25,6 +25,7 @@ func makeMockManagerServer(recvData chan interface{}) *gin.Engine {
 		var _worker WorkerStatus
 		c.BindJSON(&_worker)
 		_worker.LastOnline = time.Now()
+		_worker.LastRegister = time.Now()
 		recvData <- _worker
 		c.JSON(http.StatusOK, _worker)
 	})