Bläddra i källkod

feat(manager): flush disabled jobs

bigeagle 9 år sedan
förälder
incheckning
ca80dff5cb
6 ändrade filer med 164 tillägg och 38 borttagningar
  1. 42 3
      cmd/tunasynctl/tunasynctl.go
  2. 14 6
      internal/msg.go
  3. 21 0
      manager/db.go
  4. 48 24
      manager/db_test.go
  5. 22 4
      manager/server.go
  6. 17 1
      manager/server_test.go

+ 42 - 3
cmd/tunasynctl/tunasynctl.go

@@ -23,9 +23,10 @@ var (
 )
 
 const (
-	listJobsPath    = "/jobs"
-	listWorkersPath = "/workers"
-	cmdPath         = "/cmd"
+	listJobsPath      = "/jobs"
+	listWorkersPath   = "/workers"
+	flushDisabledPath = "/jobs/disabled"
+	cmdPath           = "/cmd"
 
 	systemCfgFile = "/etc/tunasync/ctl.conf"          // system-wide conf
 	userCfgFile   = "$HOME/.config/tunasync/ctl.conf" // user-specific conf
@@ -182,6 +183,38 @@ func listJobs(c *cli.Context) error {
 	return nil
 }
 
+func flushDisabledJobs(c *cli.Context) error {
+	req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil)
+	if err != nil {
+		logger.Panicf("Invalid  HTTP Request: %s", err.Error())
+	}
+	resp, err := client.Do(req)
+
+	if err != nil {
+		return cli.NewExitError(
+			fmt.Sprintf("Failed to send request to manager: %s",
+				err.Error()),
+			1)
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		body, err := ioutil.ReadAll(resp.Body)
+		if err != nil {
+			return cli.NewExitError(
+				fmt.Sprintf("Failed to parse response: %s", err.Error()),
+				1)
+		}
+
+		return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
+			" command: HTTP status code is not 200: %s", body),
+			1)
+	}
+
+	logger.Info("Successfully flushed disabled jobs")
+	return nil
+}
+
 func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
 	return func(c *cli.Context) error {
 		var mirrorID string
@@ -335,6 +368,12 @@ func main() {
 				}...),
 			Action: initializeWrapper(listJobs),
 		},
+		{
+			Name:   "flush",
+			Usage:  "Flush disabled jobs",
+			Flags:  commonFlags,
+			Action: initializeWrapper(flushDisabledJobs),
+		},
 		{
 			Name:   "workers",
 			Usage:  "List workers",

+ 14 - 6
internal/msg.go

@@ -27,15 +27,23 @@ type WorkerStatus struct {
 	LastOnline time.Time `json:"last_online"` // last seen
 }
 
+// A CmdVerb is an action to a job or worker
 type CmdVerb uint8
 
 const (
-	CmdStart   CmdVerb = iota
-	CmdStop            // stop syncing keep the job
-	CmdDisable         // disable the job (stops goroutine)
-	CmdRestart         // restart syncing
-	CmdPing            // ensure the goroutine is alive
-	CmdReload          // reload mirror config
+	// CmdStart start a job
+	CmdStart CmdVerb = iota
+	// CmdStop stop syncing, but keep the job
+	CmdStop
+	// CmdDisable disable the job (stops goroutine)
+	CmdDisable
+	// CmdRestart restart a syncing job
+	CmdRestart
+	// CmdPing ensures the goroutine is alive
+	CmdPing
+
+	// CmdReload tells a worker to reload mirror config
+	CmdReload
 )
 
 func (c CmdVerb) String() string {

+ 21 - 0
manager/db.go

@@ -19,6 +19,7 @@ type dbAdapter interface {
 	GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
 	ListMirrorStatus(workerID string) ([]MirrorStatus, error)
 	ListAllMirrorStatus() ([]MirrorStatus, error)
+	FlushDisabledJobs() error
 	Close() error
 }
 
@@ -170,6 +171,26 @@ func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
 	return
 }
 
+func (b *boltAdapter) FlushDisabledJobs() (err error) {
+	err = b.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(_statusBucketKey))
+		c := bucket.Cursor()
+		var m MirrorStatus
+		for k, v := c.First(); k != nil; k, v = c.Next() {
+			jsonErr := json.Unmarshal(v, &m)
+			if jsonErr != nil {
+				err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
+				continue
+			}
+			if m.Status == Disabled {
+				err = c.Delete()
+			}
+		}
+		return err
+	})
+	return
+}
+
 func (b *boltAdapter) Close() error {
 	if b.db != nil {
 		return b.db.Close()

+ 48 - 24
manager/db_test.go

@@ -58,33 +58,46 @@ func TestBoltAdapter(t *testing.T) {
 		})
 
 		Convey("update mirror status", func() {
-			status1 := MirrorStatus{
-				Name:       "arch-sync1",
-				Worker:     testWorkerIDs[0],
-				IsMaster:   true,
-				Status:     Success,
-				LastUpdate: time.Now(),
-				Upstream:   "mirrors.tuna.tsinghua.edu.cn",
-				Size:       "3GB",
-			}
-			status2 := MirrorStatus{
-				Name:       "arch-sync2",
-				Worker:     testWorkerIDs[1],
-				IsMaster:   true,
-				Status:     Success,
-				LastUpdate: time.Now(),
-				Upstream:   "mirrors.tuna.tsinghua.edu.cn",
-				Size:       "4GB",
+			status := []MirrorStatus{
+				MirrorStatus{
+					Name:       "arch-sync1",
+					Worker:     testWorkerIDs[0],
+					IsMaster:   true,
+					Status:     Success,
+					LastUpdate: time.Now(),
+					Upstream:   "mirrors.tuna.tsinghua.edu.cn",
+					Size:       "3GB",
+				},
+				MirrorStatus{
+					Name:       "arch-sync2",
+					Worker:     testWorkerIDs[1],
+					IsMaster:   true,
+					Status:     Disabled,
+					LastUpdate: time.Now(),
+					Upstream:   "mirrors.tuna.tsinghua.edu.cn",
+					Size:       "4GB",
+				},
+				MirrorStatus{
+					Name:       "arch-sync3",
+					Worker:     testWorkerIDs[1],
+					IsMaster:   true,
+					Status:     Success,
+					LastUpdate: time.Now(),
+					Upstream:   "mirrors.tuna.tsinghua.edu.cn",
+					Size:       "4GB",
+				},
 			}
 
-			_, err := boltDB.UpdateMirrorStatus(status1.Worker, status1.Name, status1)
-			_, err = boltDB.UpdateMirrorStatus(status2.Worker, status2.Name, status2)
-			So(err, ShouldBeNil)
+			for _, s := range status {
+				_, err := boltDB.UpdateMirrorStatus(s.Worker, s.Name, s)
+				So(err, ShouldBeNil)
+
+			}
 
 			Convey("get mirror status", func() {
-				m, err := boltDB.GetMirrorStatus(testWorkerIDs[0], status1.Name)
+				m, err := boltDB.GetMirrorStatus(testWorkerIDs[0], status[0].Name)
 				So(err, ShouldBeNil)
-				expectedJSON, err := json.Marshal(status1)
+				expectedJSON, err := json.Marshal(status[0])
 				So(err, ShouldBeNil)
 				actualJSON, err := json.Marshal(m)
 				So(err, ShouldBeNil)
@@ -94,7 +107,7 @@ func TestBoltAdapter(t *testing.T) {
 			Convey("list mirror status", func() {
 				ms, err := boltDB.ListMirrorStatus(testWorkerIDs[0])
 				So(err, ShouldBeNil)
-				expectedJSON, err := json.Marshal([]MirrorStatus{status1})
+				expectedJSON, err := json.Marshal([]MirrorStatus{status[0]})
 				So(err, ShouldBeNil)
 				actualJSON, err := json.Marshal(ms)
 				So(err, ShouldBeNil)
@@ -104,13 +117,24 @@ func TestBoltAdapter(t *testing.T) {
 			Convey("list all mirror status", func() {
 				ms, err := boltDB.ListAllMirrorStatus()
 				So(err, ShouldBeNil)
-				expectedJSON, err := json.Marshal([]MirrorStatus{status1, status2})
+				expectedJSON, err := json.Marshal(status)
 				So(err, ShouldBeNil)
 				actualJSON, err := json.Marshal(ms)
 				So(err, ShouldBeNil)
 				So(string(actualJSON), ShouldEqual, string(expectedJSON))
 			})
 
+			Convey("flush disabled jobs", func() {
+				ms, err := boltDB.ListAllMirrorStatus()
+				So(err, ShouldBeNil)
+				So(len(ms), ShouldEqual, 3)
+				err = boltDB.FlushDisabledJobs()
+				So(err, ShouldBeNil)
+				ms, err = boltDB.ListAllMirrorStatus()
+				So(err, ShouldBeNil)
+				So(len(ms), ShouldEqual, 2)
+			})
+
 		})
 
 	})

+ 22 - 4
manager/server.go

@@ -72,6 +72,8 @@ func GetTUNASyncManager(cfg *Config) *Manager {
 	})
 	// list jobs, status page
 	s.engine.GET("/jobs", s.listAllJobs)
+	// flush disabled jobs
+	s.engine.DELETE("/jobs/disabled", s.flushDisabledJobs)
 
 	// list workers
 	s.engine.GET("/workers", s.listWorkers)
@@ -80,10 +82,12 @@ func GetTUNASyncManager(cfg *Config) *Manager {
 
 	// workerID should be valid in this route group
 	workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
-	// get job list
-	workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
-	// post job status
-	workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
+	{
+		// get job list
+		workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
+		// post job status
+		workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
+	}
 
 	// for tunasynctl to post commands
 	s.engine.POST("/cmd", s.handleClientCmd)
@@ -139,6 +143,20 @@ func (s *Manager) listAllJobs(c *gin.Context) {
 	c.JSON(http.StatusOK, webMirStatusList)
 }
 
+// flushDisabledJobs deletes all jobs that marks as deleted
+func (s *Manager) flushDisabledJobs(c *gin.Context) {
+	err := s.adapter.FlushDisabledJobs()
+	if err != nil {
+		err := fmt.Errorf("failed to flush disabled jobs: %s",
+			err.Error(),
+		)
+		c.Error(err)
+		s.returnErrJSON(c, http.StatusInternalServerError, err)
+		return
+	}
+	c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
+}
+
 // listWrokers respond with informations of all the workers
 func (s *Manager) listWorkers(c *gin.Context) {
 	var workerInfos []WorkerStatus

+ 17 - 1
manager/server_test.go

@@ -70,7 +70,6 @@ func TestHTTPServer(t *testing.T) {
 			So(resp.StatusCode, ShouldEqual, http.StatusOK)
 
 			Convey("list all workers", func(ctx C) {
-				So(err, ShouldBeNil)
 				resp, err := http.Get(baseURL + "/workers")
 				So(err, ShouldBeNil)
 				defer resp.Body.Close()
@@ -80,6 +79,19 @@ func TestHTTPServer(t *testing.T) {
 				So(len(actualResponseObj), ShouldEqual, 2)
 			})
 
+			Convey("flush  disabled jobs", func(ctx C) {
+				req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
+				So(err, ShouldBeNil)
+				clt := &http.Client{}
+				resp, err := clt.Do(req)
+				So(err, ShouldBeNil)
+				defer resp.Body.Close()
+				res := map[string]string{}
+				err = json.NewDecoder(resp.Body).Decode(&res)
+				So(err, ShouldBeNil)
+				So(res[_infoKey], ShouldEqual, "flushed")
+			})
+
 			Convey("update mirror status of a existed worker", func(ctx C) {
 				status := MirrorStatus{
 					Name:     "arch-sync1",
@@ -295,6 +307,10 @@ func (b *mockDBAdapter) Close() error {
 	return nil
 }
 
+func (b *mockDBAdapter) FlushDisabledJobs() error {
+	return nil
+}
+
 func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine {
 	r := gin.Default()
 	r.GET("/ping", func(c *gin.Context) {