worker_test.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package worker
  2. import (
  3. "net/http"
  4. "strconv"
  5. "testing"
  6. "time"
  7. "github.com/gin-gonic/gin"
  8. . "github.com/smartystreets/goconvey/convey"
  9. . "github.com/tuna/tunasync/internal"
  10. )
  11. type workTestFunc func(*Worker)
  12. func makeMockManagerServer(recvData chan interface{}) *gin.Engine {
  13. r := gin.Default()
  14. r.GET("/ping", func(c *gin.Context) {
  15. c.JSON(http.StatusOK, gin.H{"_infoKey": "pong"})
  16. })
  17. r.POST("/workers", func(c *gin.Context) {
  18. var _worker WorkerStatus
  19. c.BindJSON(&_worker)
  20. _worker.LastOnline = time.Now()
  21. recvData <- _worker
  22. c.JSON(http.StatusOK, _worker)
  23. })
  24. r.POST("/workers/dut/schedules", func(c *gin.Context) {
  25. var _sch MirrorSchedules
  26. c.BindJSON(&_sch)
  27. recvData <- _sch
  28. c.JSON(http.StatusOK, empty{})
  29. })
  30. r.GET("/workers/dut/jobs", func(c *gin.Context) {
  31. mirrorStatusList := []MirrorStatus{}
  32. c.JSON(http.StatusOK, mirrorStatusList)
  33. })
  34. return r
  35. }
  36. func startWorkerThenStop(cfg *Config, tester workTestFunc) {
  37. exitedChan := make(chan int)
  38. w := NewTUNASyncWorker(cfg)
  39. So(w, ShouldNotBeNil)
  40. go func() {
  41. w.Run()
  42. exitedChan <- 1
  43. }()
  44. tester(w)
  45. w.Halt()
  46. select {
  47. case exited := <-exitedChan:
  48. So(exited, ShouldEqual, 1)
  49. case <-time.After(2 * time.Second):
  50. So(0, ShouldEqual, 1)
  51. }
  52. }
  53. func TestWorker(t *testing.T) {
  54. managerPort := 5001
  55. InitLogger(false, true, false)
  56. recvDataChan := make(chan interface{})
  57. _s := makeMockManagerServer(recvDataChan)
  58. httpServer := &http.Server{
  59. Addr: "localhost:" + strconv.Itoa(managerPort),
  60. Handler: _s,
  61. ReadTimeout: 2 * time.Second,
  62. WriteTimeout: 2 * time.Second,
  63. }
  64. go func() {
  65. err := httpServer.ListenAndServe()
  66. So(err, ShouldBeNil)
  67. }()
  68. Convey("Worker should work", t, func(ctx C) {
  69. workerCfg := Config{
  70. Global: globalConfig{
  71. Name: "dut",
  72. LogDir: "/tmp",
  73. MirrorDir: "/tmp",
  74. Concurrent: 2,
  75. Interval: 1,
  76. },
  77. Manager: managerConfig{
  78. APIBase: "http://localhost:" + strconv.Itoa(managerPort),
  79. },
  80. }
  81. Convey("with no job", func(ctx C) {
  82. dummyTester := func(*Worker) {
  83. for {
  84. select {
  85. case data := <-recvDataChan:
  86. if reg, ok := data.(WorkerStatus); ok {
  87. So(reg.ID, ShouldEqual, "dut")
  88. } else if sch, ok := data.(MirrorSchedules); ok {
  89. So(len(sch.Schedules), ShouldEqual, 0)
  90. }
  91. case <-time.After(2 * time.Second):
  92. return
  93. }
  94. }
  95. }
  96. startWorkerThenStop(&workerCfg, dummyTester)
  97. })
  98. Convey("with one job", func(ctx C) {
  99. workerCfg.Mirrors = []mirrorConfig{
  100. mirrorConfig{
  101. Name: "job-ls",
  102. Provider: provCommand,
  103. Command: "ls",
  104. },
  105. }
  106. dummyTester := func(*Worker) {
  107. for {
  108. select {
  109. case data := <-recvDataChan:
  110. if reg, ok := data.(WorkerStatus); ok {
  111. So(reg.ID, ShouldEqual, "dut")
  112. } else if sch, ok := data.(MirrorSchedules); ok {
  113. So(len(sch.Schedules), ShouldEqual, 1)
  114. So(sch.Schedules[0].MirrorName, ShouldEqual, "job-ls")
  115. So(sch.Schedules[0].NextSchedule,
  116. ShouldHappenBetween,
  117. time.Now().Add(-2*time.Second),
  118. time.Now().Add(1*time.Minute))
  119. }
  120. case <-time.After(2 * time.Second):
  121. return
  122. }
  123. }
  124. }
  125. startWorkerThenStop(&workerCfg, dummyTester)
  126. })
  127. Convey("with several jobs", func(ctx C) {
  128. workerCfg.Mirrors = []mirrorConfig{
  129. mirrorConfig{
  130. Name: "job-ls-1",
  131. Provider: provCommand,
  132. Command: "ls",
  133. },
  134. mirrorConfig{
  135. Name: "job-fail",
  136. Provider: provCommand,
  137. Command: "non-existent-command-xxxx",
  138. },
  139. mirrorConfig{
  140. Name: "job-ls-2",
  141. Provider: provCommand,
  142. Command: "ls",
  143. },
  144. }
  145. dummyTester := func(*Worker) {
  146. for {
  147. select {
  148. case data := <-recvDataChan:
  149. if reg, ok := data.(WorkerStatus); ok {
  150. So(reg.ID, ShouldEqual, "dut")
  151. } else if sch, ok := data.(MirrorSchedules); ok {
  152. So(len(sch.Schedules), ShouldEqual, 3)
  153. }
  154. case <-time.After(2 * time.Second):
  155. return
  156. }
  157. }
  158. }
  159. startWorkerThenStop(&workerCfg, dummyTester)
  160. })
  161. })
  162. }