Browse Source

New feature: remove a worker with tunasynctl

Fix #78
Yuxiang Zhang 7 years ago
parent
commit
7e601d9fff
5 changed files with 143 additions and 3 deletions
  1. 58 0
      cmd/tunasynctl/tunasynctl.go
  2. 14 0
      manager/db.go
  3. 21 3
      manager/db_test.go
  4. 18 0
      manager/server.go
  5. 32 0
      manager/server_test.go

+ 58 - 0
cmd/tunasynctl/tunasynctl.go

@@ -236,6 +236,52 @@ func updateMirrorSize(c *cli.Context) error {
 	return nil
 	return nil
 }
 }
 
 
+func removeWorker(c *cli.Context) error {
+	args := c.Args()
+	if len(args) != 0 {
+		return cli.NewExitError("Usage: tunasynctl -w <worker-id>", 1)
+	}
+	workerID := c.String("worker")
+	if len(workerID) == 0 {
+		return cli.NewExitError("Please specify the <worker-id>", 1)
+	}
+	url := fmt.Sprintf("%s/workers/%s", baseURL, workerID)
+
+	req, err := http.NewRequest("DELETE", url, nil)
+	if err != nil {
+		logger.Panicf("Invalid HTTP Request: %s", err.Error())
+	}
+	resp, err := client.Do(req)
+
+	if err != nil {
+		return cli.NewExitError(
+			fmt.Sprintf("Failed to send request to manager: %s", err.Error()), 1)
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		body, err := ioutil.ReadAll(resp.Body)
+		if err != nil {
+			return cli.NewExitError(
+				fmt.Sprintf("Failed to parse response: %s", err.Error()),
+				1)
+		}
+
+		return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
+			" command: HTTP status code is not 200: %s", body),
+			1)
+	}
+
+	res := map[string]string{}
+	err = json.NewDecoder(resp.Body).Decode(&res)
+	if res["message"] == "deleted" {
+		logger.Info("Successfully removed the worker")
+	} else {
+		logger.Info("Failed to remove the worker")
+	}
+	return nil
+}
+
 func flushDisabledJobs(c *cli.Context) error {
 func flushDisabledJobs(c *cli.Context) error {
 	req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil)
 	req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil)
 	if err != nil {
 	if err != nil {
@@ -435,6 +481,18 @@ func main() {
 			Flags:  commonFlags,
 			Flags:  commonFlags,
 			Action: initializeWrapper(listWorkers),
 			Action: initializeWrapper(listWorkers),
 		},
 		},
+		{
+			Name:  "rm-worker",
+			Usage: "Remove a worker",
+			Flags: append(
+				commonFlags,
+				cli.StringFlag{
+					Name:  "worker, w",
+					Usage: "worker-id of the worker to be removed",
+				},
+			),
+			Action: initializeWrapper(removeWorker),
+		},
 		{
 		{
 			Name:  "set-size",
 			Name:  "set-size",
 			Usage: "Set mirror size",
 			Usage: "Set mirror size",

+ 14 - 0
manager/db.go

@@ -14,6 +14,7 @@ type dbAdapter interface {
 	Init() error
 	Init() error
 	ListWorkers() ([]WorkerStatus, error)
 	ListWorkers() ([]WorkerStatus, error)
 	GetWorker(workerID string) (WorkerStatus, error)
 	GetWorker(workerID string) (WorkerStatus, error)
+	DeleteWorker(workerID string) error
 	CreateWorker(w WorkerStatus) (WorkerStatus, error)
 	CreateWorker(w WorkerStatus) (WorkerStatus, error)
 	UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
 	UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
 	GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
 	GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
@@ -95,6 +96,19 @@ func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
 	return
 	return
 }
 }
 
 
+func (b *boltAdapter) DeleteWorker(workerID string) (err error) {
+	err = b.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(_workerBucketKey))
+		v := bucket.Get([]byte(workerID))
+		if v == nil {
+			return fmt.Errorf("invalid workerID %s", workerID)
+		}
+		err := bucket.Delete([]byte(workerID))
+		return err
+	})
+	return
+}
+
 func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
 func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
 	err := b.db.Update(func(tx *bolt.Tx) error {
 	err := b.db.Update(func(tx *bolt.Tx) error {
 		bucket := tx.Bucket([]byte(_workerBucketKey))
 		bucket := tx.Bucket([]byte(_workerBucketKey))

+ 21 - 3
manager/db_test.go

@@ -40,21 +40,39 @@ func TestBoltAdapter(t *testing.T) {
 				So(err, ShouldBeNil)
 				So(err, ShouldBeNil)
 			}
 			}
 
 
-			Convey("get exists worker", func() {
+			Convey("get existent worker", func() {
 				_, err := boltDB.GetWorker(testWorkerIDs[0])
 				_, err := boltDB.GetWorker(testWorkerIDs[0])
 				So(err, ShouldBeNil)
 				So(err, ShouldBeNil)
 			})
 			})
 
 
-			Convey("list exist worker", func() {
+			Convey("list existent workers", func() {
 				ws, err := boltDB.ListWorkers()
 				ws, err := boltDB.ListWorkers()
 				So(err, ShouldBeNil)
 				So(err, ShouldBeNil)
 				So(len(ws), ShouldEqual, 2)
 				So(len(ws), ShouldEqual, 2)
 			})
 			})
 
 
-			Convey("get inexist worker", func() {
+			Convey("get non-existent worker", func() {
 				_, err := boltDB.GetWorker("invalid workerID")
 				_, err := boltDB.GetWorker("invalid workerID")
 				So(err, ShouldNotBeNil)
 				So(err, ShouldNotBeNil)
 			})
 			})
+
+			Convey("delete existent worker", func() {
+				err := boltDB.DeleteWorker(testWorkerIDs[0])
+				So(err, ShouldBeNil)
+				_, err = boltDB.GetWorker(testWorkerIDs[0])
+				So(err, ShouldNotBeNil)
+				ws, err := boltDB.ListWorkers()
+				So(err, ShouldBeNil)
+				So(len(ws), ShouldEqual, 1)
+			})
+
+			Convey("delete non-existent worker", func() {
+				err := boltDB.DeleteWorker("invalid workerID")
+				So(err, ShouldNotBeNil)
+				ws, err := boltDB.ListWorkers()
+				So(err, ShouldBeNil)
+				So(len(ws), ShouldEqual, 2)
+			})
 		})
 		})
 
 
 		Convey("update mirror status", func() {
 		Convey("update mirror status", func() {

+ 18 - 0
manager/server.go

@@ -84,6 +84,8 @@ func GetTUNASyncManager(cfg *Config) *Manager {
 	// workerID should be valid in this route group
 	// workerID should be valid in this route group
 	workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
 	workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
 	{
 	{
+		// delete specified worker
+		workerValidateGroup.DELETE(":id", s.deleteWorker)
 		// get job list
 		// get job list
 		workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
 		workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
 		// post job status
 		// post job status
@@ -159,6 +161,22 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) {
 	c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
 	c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
 }
 }
 
 
+// deleteWorker deletes one worker by id
+func (s *Manager) deleteWorker(c *gin.Context) {
+	workerID := c.Param("id")
+	err := s.adapter.DeleteWorker(workerID)
+	if err != nil {
+		err := fmt.Errorf("failed to delete worker: %s",
+			err.Error(),
+		)
+		c.Error(err)
+		s.returnErrJSON(c, http.StatusInternalServerError, err)
+		return
+	}
+	logger.Noticef("Worker <%s> deleted", workerID)
+	c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
+}
+
 // listWrokers respond with informations of all the workers
 // listWrokers respond with informations of all the workers
 func (s *Manager) listWorkers(c *gin.Context) {
 func (s *Manager) listWorkers(c *gin.Context) {
 	var workerInfos []WorkerStatus
 	var workerInfos []WorkerStatus

+ 32 - 0
manager/server_test.go

@@ -79,6 +79,33 @@ func TestHTTPServer(t *testing.T) {
 				So(len(actualResponseObj), ShouldEqual, 2)
 				So(len(actualResponseObj), ShouldEqual, 2)
 			})
 			})
 
 
+			Convey("delete an existent worker", func(ctx C) {
+				req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil)
+				So(err, ShouldBeNil)
+				clt := &http.Client{}
+				resp, err := clt.Do(req)
+				So(err, ShouldBeNil)
+				defer resp.Body.Close()
+				res := map[string]string{}
+				err = json.NewDecoder(resp.Body).Decode(&res)
+				So(err, ShouldBeNil)
+				So(res[_infoKey], ShouldEqual, "deleted")
+			})
+
+			Convey("delete non-existent worker", func(ctx C) {
+				invalidWorker := "test_worker233"
+				req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil)
+				So(err, ShouldBeNil)
+				clt := &http.Client{}
+				resp, err := clt.Do(req)
+				So(err, ShouldBeNil)
+				defer resp.Body.Close()
+				res := map[string]string{}
+				err = json.NewDecoder(resp.Body).Decode(&res)
+				So(err, ShouldBeNil)
+				So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
+			})
+
 			Convey("flush  disabled jobs", func(ctx C) {
 			Convey("flush  disabled jobs", func(ctx C) {
 				req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
 				req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
 				So(err, ShouldBeNil)
 				So(err, ShouldBeNil)
@@ -323,6 +350,11 @@ func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
 	return w, nil
 	return w, nil
 }
 }
 
 
+func (b *mockDBAdapter) DeleteWorker(workerID string) error {
+	delete(b.workerStore, workerID)
+	return nil
+}
+
 func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
 func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
 	// _, ok := b.workerStore[w.ID]
 	// _, ok := b.workerStore[w.ID]
 	// if ok {
 	// if ok {