123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- package worker
- import (
- "net/http"
- "strconv"
- "testing"
- "time"
- "github.com/gin-gonic/gin"
- . "github.com/smartystreets/goconvey/convey"
- . "github.com/tuna/tunasync/internal"
- )
- type workTestFunc func(*Worker)
- var managerPort = 5001
- var workerPort = 5002
- func makeMockManagerServer(recvData chan interface{}) *gin.Engine {
- r := gin.Default()
- r.GET("/ping", func(c *gin.Context) {
- c.JSON(http.StatusOK, gin.H{"_infoKey": "pong"})
- })
- r.POST("/workers", func(c *gin.Context) {
- var _worker WorkerStatus
- c.BindJSON(&_worker)
- _worker.LastOnline = time.Now()
- _worker.LastRegister = time.Now()
- recvData <- _worker
- c.JSON(http.StatusOK, _worker)
- })
- r.POST("/workers/dut/schedules", func(c *gin.Context) {
- var _sch MirrorSchedules
- c.BindJSON(&_sch)
- recvData <- _sch
- c.JSON(http.StatusOK, empty{})
- })
- r.POST("/workers/dut/jobs/:job", func(c *gin.Context) {
- var status MirrorStatus
- c.BindJSON(&status)
- recvData <- status
- c.JSON(http.StatusOK, status)
- })
- r.GET("/workers/dut/jobs", func(c *gin.Context) {
- mirrorStatusList := []MirrorStatus{}
- c.JSON(http.StatusOK, mirrorStatusList)
- })
- return r
- }
- func startWorkerThenStop(cfg *Config, tester workTestFunc) {
- exitedChan := make(chan int)
- w := NewTUNASyncWorker(cfg)
- So(w, ShouldNotBeNil)
- go func() {
- w.Run()
- exitedChan <- 1
- }()
- tester(w)
- w.Halt()
- select {
- case exited := <-exitedChan:
- So(exited, ShouldEqual, 1)
- case <-time.After(2 * time.Second):
- So(0, ShouldEqual, 1)
- }
- }
- func sendCommandToWorker(workerURL string, httpClient *http.Client, cmd CmdVerb, mirror string) {
- workerCmd := WorkerCmd{
- Cmd: cmd,
- MirrorID: mirror,
- }
- logger.Debugf("POST to %s with cmd %s", workerURL, cmd)
- _, err := PostJSON(workerURL, workerCmd, httpClient)
- So(err, ShouldBeNil)
- }
- func TestWorker(t *testing.T) {
- InitLogger(false, true, false)
- recvDataChan := make(chan interface{})
- _s := makeMockManagerServer(recvDataChan)
- httpServer := &http.Server{
- Addr: "localhost:" + strconv.Itoa(managerPort),
- Handler: _s,
- ReadTimeout: 2 * time.Second,
- WriteTimeout: 2 * time.Second,
- }
- go func() {
- err := httpServer.ListenAndServe()
- So(err, ShouldBeNil)
- }()
- // Wait for http server starting
- time.Sleep(500 * time.Millisecond)
- Convey("Worker should work", t, func(ctx C) {
- httpClient, err := CreateHTTPClient("")
- So(err, ShouldBeNil)
- workerPort++
- workerCfg := Config{
- Global: globalConfig{
- Name: "dut",
- LogDir: "/tmp",
- MirrorDir: "/tmp",
- Concurrent: 2,
- Interval: 1,
- },
- Server: serverConfig{
- Hostname: "localhost",
- Addr: "127.0.0.1",
- Port: workerPort,
- },
- Manager: managerConfig{
- APIBase: "http://localhost:" + strconv.Itoa(managerPort),
- },
- }
- logger.Debugf("worker port %d", workerPort)
- Convey("with no job", func(ctx C) {
- dummyTester := func(*Worker) {
- registered := false
- for {
- select {
- case data := <-recvDataChan:
- if reg, ok := data.(WorkerStatus); ok {
- So(reg.ID, ShouldEqual, "dut")
- registered = true
- time.Sleep(500 * time.Millisecond)
- sendCommandToWorker(reg.URL, httpClient, CmdStart, "foobar")
- } else if sch, ok := data.(MirrorSchedules); ok {
- So(len(sch.Schedules), ShouldEqual, 0)
- }
- case <-time.After(2 * time.Second):
- So(registered, ShouldBeTrue)
- return
- }
- }
- }
- startWorkerThenStop(&workerCfg, dummyTester)
- })
- Convey("with one job", func(ctx C) {
- workerCfg.Mirrors = []mirrorConfig{
- mirrorConfig{
- Name: "job-ls",
- Provider: provCommand,
- Command: "ls",
- },
- }
- dummyTester := func(*Worker) {
- url := ""
- jobRunning := false
- lastStatus := SyncStatus(None)
- for {
- select {
- case data := <-recvDataChan:
- if reg, ok := data.(WorkerStatus); ok {
- So(reg.ID, ShouldEqual, "dut")
- url = reg.URL
- time.Sleep(500 * time.Millisecond)
- sendCommandToWorker(url, httpClient, CmdStart, "job-ls")
- } else if sch, ok := data.(MirrorSchedules); ok {
- if !jobRunning {
- So(len(sch.Schedules), ShouldEqual, 1)
- So(sch.Schedules[0].MirrorName, ShouldEqual, "job-ls")
- So(sch.Schedules[0].NextSchedule,
- ShouldHappenBetween,
- time.Now().Add(-2*time.Second),
- time.Now().Add(1*time.Minute))
- }
- } else if status, ok := data.(MirrorStatus); ok {
- logger.Noticef("Job %s status %s", status.Name, status.Status.String())
- jobRunning = status.Status == PreSyncing || status.Status == Syncing
- So(status.Status, ShouldNotEqual, Failed)
- lastStatus = status.Status
- }
- case <-time.After(2 * time.Second):
- So(url, ShouldNotEqual, "")
- So(jobRunning, ShouldBeFalse)
- So(lastStatus, ShouldEqual, Success)
- return
- }
- }
- }
- startWorkerThenStop(&workerCfg, dummyTester)
- })
- Convey("with several jobs", func(ctx C) {
- workerCfg.Mirrors = []mirrorConfig{
- mirrorConfig{
- Name: "job-ls-1",
- Provider: provCommand,
- Command: "ls",
- },
- mirrorConfig{
- Name: "job-fail",
- Provider: provCommand,
- Command: "non-existent-command-xxxx",
- },
- mirrorConfig{
- Name: "job-ls-2",
- Provider: provCommand,
- Command: "ls",
- },
- }
- dummyTester := func(*Worker) {
- url := ""
- lastStatus := make(map[string]SyncStatus)
- nextSch := make(map[string]time.Time)
- for {
- select {
- case data := <-recvDataChan:
- if reg, ok := data.(WorkerStatus); ok {
- So(reg.ID, ShouldEqual, "dut")
- url = reg.URL
- time.Sleep(500 * time.Millisecond)
- sendCommandToWorker(url, httpClient, CmdStart, "job-fail")
- sendCommandToWorker(url, httpClient, CmdStart, "job-ls-1")
- sendCommandToWorker(url, httpClient, CmdStart, "job-ls-2")
- } else if sch, ok := data.(MirrorSchedules); ok {
- //So(len(sch.Schedules), ShouldEqual, 3)
- for _, item := range sch.Schedules {
- nextSch[item.MirrorName] = item.NextSchedule
- }
- } else if status, ok := data.(MirrorStatus); ok {
- logger.Noticef("Job %s status %s", status.Name, status.Status.String())
- jobRunning := status.Status == PreSyncing || status.Status == Syncing
- if !jobRunning {
- if status.Name == "job-fail" {
- So(status.Status, ShouldEqual, Failed)
- } else {
- So(status.Status, ShouldNotEqual, Failed)
- }
- }
- lastStatus[status.Name] = status.Status
- }
- case <-time.After(2 * time.Second):
- So(len(lastStatus), ShouldEqual, 3)
- So(len(nextSch), ShouldEqual, 3)
- return
- }
- }
- }
- startWorkerThenStop(&workerCfg, dummyTester)
- })
- })
- }
|