worker_test.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package worker
  2. import (
  3. "net/http"
  4. "strconv"
  5. "testing"
  6. "time"
  7. "github.com/gin-gonic/gin"
  8. . "github.com/smartystreets/goconvey/convey"
  9. . "github.com/tuna/tunasync/internal"
  10. )
  11. type workTestFunc func(*Worker)
  12. var managerPort = 5001
  13. var workerPort = 5002
  14. func makeMockManagerServer(recvData chan interface{}) *gin.Engine {
  15. r := gin.Default()
  16. r.GET("/ping", func(c *gin.Context) {
  17. c.JSON(http.StatusOK, gin.H{"_infoKey": "pong"})
  18. })
  19. r.POST("/workers", func(c *gin.Context) {
  20. var _worker WorkerStatus
  21. c.BindJSON(&_worker)
  22. _worker.LastOnline = time.Now()
  23. recvData <- _worker
  24. c.JSON(http.StatusOK, _worker)
  25. })
  26. r.POST("/workers/dut/schedules", func(c *gin.Context) {
  27. var _sch MirrorSchedules
  28. c.BindJSON(&_sch)
  29. recvData <- _sch
  30. c.JSON(http.StatusOK, empty{})
  31. })
  32. r.POST("/workers/dut/jobs/:job", func(c *gin.Context) {
  33. var status MirrorStatus
  34. c.BindJSON(&status)
  35. recvData <- status
  36. c.JSON(http.StatusOK, status)
  37. })
  38. r.GET("/workers/dut/jobs", func(c *gin.Context) {
  39. mirrorStatusList := []MirrorStatus{}
  40. c.JSON(http.StatusOK, mirrorStatusList)
  41. })
  42. return r
  43. }
  44. func startWorkerThenStop(cfg *Config, tester workTestFunc) {
  45. exitedChan := make(chan int)
  46. w := NewTUNASyncWorker(cfg)
  47. So(w, ShouldNotBeNil)
  48. go func() {
  49. w.Run()
  50. exitedChan <- 1
  51. }()
  52. tester(w)
  53. w.Halt()
  54. select {
  55. case exited := <-exitedChan:
  56. So(exited, ShouldEqual, 1)
  57. case <-time.After(2 * time.Second):
  58. So(0, ShouldEqual, 1)
  59. }
  60. }
  61. func sendCommandToWorker(workerURL string, httpClient *http.Client, cmd CmdVerb, mirror string) {
  62. workerCmd := WorkerCmd{
  63. Cmd: cmd,
  64. MirrorID: mirror,
  65. }
  66. logger.Debugf("POST to %s with cmd %s", workerURL, cmd)
  67. _, err := PostJSON(workerURL, workerCmd, httpClient)
  68. So(err, ShouldBeNil)
  69. }
  70. func TestWorker(t *testing.T) {
  71. InitLogger(false, true, false)
  72. recvDataChan := make(chan interface{})
  73. _s := makeMockManagerServer(recvDataChan)
  74. httpServer := &http.Server{
  75. Addr: "localhost:" + strconv.Itoa(managerPort),
  76. Handler: _s,
  77. ReadTimeout: 2 * time.Second,
  78. WriteTimeout: 2 * time.Second,
  79. }
  80. go func() {
  81. err := httpServer.ListenAndServe()
  82. So(err, ShouldBeNil)
  83. }()
  84. Convey("Worker should work", t, func(ctx C) {
  85. httpClient, err := CreateHTTPClient("")
  86. So(err, ShouldBeNil)
  87. workerPort++
  88. workerCfg := Config{
  89. Global: globalConfig{
  90. Name: "dut",
  91. LogDir: "/tmp",
  92. MirrorDir: "/tmp",
  93. Concurrent: 2,
  94. Interval: 1,
  95. },
  96. Server: serverConfig{
  97. Hostname: "localhost",
  98. Addr: "127.0.0.1",
  99. Port: workerPort,
  100. },
  101. Manager: managerConfig{
  102. APIBase: "http://localhost:" + strconv.Itoa(managerPort),
  103. },
  104. }
  105. logger.Debugf("worker port %d", workerPort)
  106. Convey("with no job", func(ctx C) {
  107. dummyTester := func(*Worker) {
  108. registered := false
  109. for {
  110. select {
  111. case data := <-recvDataChan:
  112. if reg, ok := data.(WorkerStatus); ok {
  113. So(reg.ID, ShouldEqual, "dut")
  114. registered = true
  115. time.Sleep(500 * time.Millisecond)
  116. sendCommandToWorker(reg.URL, httpClient, CmdStart, "foobar")
  117. } else if sch, ok := data.(MirrorSchedules); ok {
  118. So(len(sch.Schedules), ShouldEqual, 0)
  119. }
  120. case <-time.After(2 * time.Second):
  121. So(registered, ShouldBeTrue)
  122. return
  123. }
  124. }
  125. }
  126. startWorkerThenStop(&workerCfg, dummyTester)
  127. })
  128. Convey("with one job", func(ctx C) {
  129. workerCfg.Mirrors = []mirrorConfig{
  130. mirrorConfig{
  131. Name: "job-ls",
  132. Provider: provCommand,
  133. Command: "ls",
  134. },
  135. }
  136. dummyTester := func(*Worker) {
  137. url := ""
  138. jobRunning := false
  139. lastStatus := SyncStatus(None)
  140. for {
  141. select {
  142. case data := <-recvDataChan:
  143. if reg, ok := data.(WorkerStatus); ok {
  144. So(reg.ID, ShouldEqual, "dut")
  145. url = reg.URL
  146. time.Sleep(500 * time.Millisecond)
  147. sendCommandToWorker(url, httpClient, CmdStart, "job-ls")
  148. } else if sch, ok := data.(MirrorSchedules); ok {
  149. if !jobRunning {
  150. So(len(sch.Schedules), ShouldEqual, 1)
  151. So(sch.Schedules[0].MirrorName, ShouldEqual, "job-ls")
  152. So(sch.Schedules[0].NextSchedule,
  153. ShouldHappenBetween,
  154. time.Now().Add(-2*time.Second),
  155. time.Now().Add(1*time.Minute))
  156. }
  157. } else if status, ok := data.(MirrorStatus); ok {
  158. logger.Noticef("Job %s status %s", status.Name, status.Status.String())
  159. jobRunning = status.Status == PreSyncing || status.Status == Syncing
  160. So(status.Status, ShouldNotEqual, Failed)
  161. lastStatus = status.Status
  162. }
  163. case <-time.After(2 * time.Second):
  164. So(url, ShouldNotEqual, "")
  165. So(jobRunning, ShouldBeFalse)
  166. So(lastStatus, ShouldEqual, Success)
  167. return
  168. }
  169. }
  170. }
  171. startWorkerThenStop(&workerCfg, dummyTester)
  172. })
  173. Convey("with several jobs", func(ctx C) {
  174. workerCfg.Mirrors = []mirrorConfig{
  175. mirrorConfig{
  176. Name: "job-ls-1",
  177. Provider: provCommand,
  178. Command: "ls",
  179. },
  180. mirrorConfig{
  181. Name: "job-fail",
  182. Provider: provCommand,
  183. Command: "non-existent-command-xxxx",
  184. },
  185. mirrorConfig{
  186. Name: "job-ls-2",
  187. Provider: provCommand,
  188. Command: "ls",
  189. },
  190. }
  191. dummyTester := func(*Worker) {
  192. url := ""
  193. lastStatus := make(map[string]SyncStatus)
  194. nextSch := make(map[string]time.Time)
  195. for {
  196. select {
  197. case data := <-recvDataChan:
  198. if reg, ok := data.(WorkerStatus); ok {
  199. So(reg.ID, ShouldEqual, "dut")
  200. url = reg.URL
  201. time.Sleep(500 * time.Millisecond)
  202. sendCommandToWorker(url, httpClient, CmdStart, "job-fail")
  203. sendCommandToWorker(url, httpClient, CmdStart, "job-ls-1")
  204. sendCommandToWorker(url, httpClient, CmdStart, "job-ls-2")
  205. } else if sch, ok := data.(MirrorSchedules); ok {
  206. //So(len(sch.Schedules), ShouldEqual, 3)
  207. for _, item := range sch.Schedules {
  208. nextSch[item.MirrorName] = item.NextSchedule
  209. }
  210. } else if status, ok := data.(MirrorStatus); ok {
  211. logger.Noticef("Job %s status %s", status.Name, status.Status.String())
  212. jobRunning := status.Status == PreSyncing || status.Status == Syncing
  213. if !jobRunning {
  214. if status.Name == "job-fail" {
  215. So(status.Status, ShouldEqual, Failed)
  216. } else {
  217. So(status.Status, ShouldNotEqual, Failed)
  218. }
  219. }
  220. lastStatus[status.Name] = status.Status
  221. }
  222. case <-time.After(2 * time.Second):
  223. So(len(lastStatus), ShouldEqual, 3)
  224. So(len(nextSch), ShouldEqual, 3)
  225. return
  226. }
  227. }
  228. }
  229. startWorkerThenStop(&workerCfg, dummyTester)
  230. })
  231. })
  232. }