2
0

worker_test.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package worker
  2. import (
  3. "net/http"
  4. "strconv"
  5. "testing"
  6. "time"
  7. "github.com/gin-gonic/gin"
  8. . "github.com/smartystreets/goconvey/convey"
  9. . "github.com/tuna/tunasync/internal"
  10. )
  11. type workTestFunc func(*Worker)
  12. var managerPort = 5001
  13. var workerPort = 5002
  14. func makeMockManagerServer(recvData chan interface{}) *gin.Engine {
  15. r := gin.Default()
  16. r.GET("/ping", func(c *gin.Context) {
  17. c.JSON(http.StatusOK, gin.H{"_infoKey": "pong"})
  18. })
  19. r.POST("/workers", func(c *gin.Context) {
  20. var _worker WorkerStatus
  21. c.BindJSON(&_worker)
  22. _worker.LastOnline = time.Now()
  23. _worker.LastRegister = time.Now()
  24. recvData <- _worker
  25. c.JSON(http.StatusOK, _worker)
  26. })
  27. r.POST("/workers/dut/schedules", func(c *gin.Context) {
  28. var _sch MirrorSchedules
  29. c.BindJSON(&_sch)
  30. recvData <- _sch
  31. c.JSON(http.StatusOK, empty{})
  32. })
  33. r.POST("/workers/dut/jobs/:job", func(c *gin.Context) {
  34. var status MirrorStatus
  35. c.BindJSON(&status)
  36. recvData <- status
  37. c.JSON(http.StatusOK, status)
  38. })
  39. r.GET("/workers/dut/jobs", func(c *gin.Context) {
  40. mirrorStatusList := []MirrorStatus{}
  41. c.JSON(http.StatusOK, mirrorStatusList)
  42. })
  43. return r
  44. }
  45. func startWorkerThenStop(cfg *Config, tester workTestFunc) {
  46. exitedChan := make(chan int)
  47. w := NewTUNASyncWorker(cfg)
  48. So(w, ShouldNotBeNil)
  49. go func() {
  50. w.Run()
  51. exitedChan <- 1
  52. }()
  53. tester(w)
  54. w.Halt()
  55. select {
  56. case exited := <-exitedChan:
  57. So(exited, ShouldEqual, 1)
  58. case <-time.After(2 * time.Second):
  59. So(0, ShouldEqual, 1)
  60. }
  61. }
  62. func sendCommandToWorker(workerURL string, httpClient *http.Client, cmd CmdVerb, mirror string) {
  63. workerCmd := WorkerCmd{
  64. Cmd: cmd,
  65. MirrorID: mirror,
  66. }
  67. logger.Debugf("POST to %s with cmd %s", workerURL, cmd)
  68. _, err := PostJSON(workerURL, workerCmd, httpClient)
  69. So(err, ShouldBeNil)
  70. }
  71. func TestWorker(t *testing.T) {
  72. InitLogger(false, true, false)
  73. recvDataChan := make(chan interface{})
  74. _s := makeMockManagerServer(recvDataChan)
  75. httpServer := &http.Server{
  76. Addr: "localhost:" + strconv.Itoa(managerPort),
  77. Handler: _s,
  78. ReadTimeout: 2 * time.Second,
  79. WriteTimeout: 2 * time.Second,
  80. }
  81. go func() {
  82. err := httpServer.ListenAndServe()
  83. So(err, ShouldBeNil)
  84. }()
  85. // Wait for http server starting
  86. time.Sleep(500 * time.Millisecond)
  87. Convey("Worker should work", t, func(ctx C) {
  88. httpClient, err := CreateHTTPClient("")
  89. So(err, ShouldBeNil)
  90. workerPort++
  91. workerCfg := Config{
  92. Global: globalConfig{
  93. Name: "dut",
  94. LogDir: "/tmp",
  95. MirrorDir: "/tmp",
  96. Concurrent: 2,
  97. Interval: 1,
  98. },
  99. Server: serverConfig{
  100. Hostname: "localhost",
  101. Addr: "127.0.0.1",
  102. Port: workerPort,
  103. },
  104. Manager: managerConfig{
  105. APIBase: "http://localhost:" + strconv.Itoa(managerPort),
  106. },
  107. }
  108. logger.Debugf("worker port %d", workerPort)
  109. Convey("with no job", func(ctx C) {
  110. dummyTester := func(*Worker) {
  111. registered := false
  112. for {
  113. select {
  114. case data := <-recvDataChan:
  115. if reg, ok := data.(WorkerStatus); ok {
  116. So(reg.ID, ShouldEqual, "dut")
  117. registered = true
  118. time.Sleep(500 * time.Millisecond)
  119. sendCommandToWorker(reg.URL, httpClient, CmdStart, "foobar")
  120. } else if sch, ok := data.(MirrorSchedules); ok {
  121. So(len(sch.Schedules), ShouldEqual, 0)
  122. }
  123. case <-time.After(2 * time.Second):
  124. So(registered, ShouldBeTrue)
  125. return
  126. }
  127. }
  128. }
  129. startWorkerThenStop(&workerCfg, dummyTester)
  130. })
  131. Convey("with one job", func(ctx C) {
  132. workerCfg.Mirrors = []mirrorConfig{
  133. {
  134. Name: "job-ls",
  135. Provider: provCommand,
  136. Command: "ls",
  137. },
  138. }
  139. dummyTester := func(*Worker) {
  140. url := ""
  141. jobRunning := false
  142. lastStatus := SyncStatus(None)
  143. for {
  144. select {
  145. case data := <-recvDataChan:
  146. if reg, ok := data.(WorkerStatus); ok {
  147. So(reg.ID, ShouldEqual, "dut")
  148. url = reg.URL
  149. time.Sleep(500 * time.Millisecond)
  150. sendCommandToWorker(url, httpClient, CmdStart, "job-ls")
  151. } else if sch, ok := data.(MirrorSchedules); ok {
  152. if !jobRunning {
  153. So(len(sch.Schedules), ShouldEqual, 1)
  154. So(sch.Schedules[0].MirrorName, ShouldEqual, "job-ls")
  155. So(sch.Schedules[0].NextSchedule,
  156. ShouldHappenBetween,
  157. time.Now().Add(-2*time.Second),
  158. time.Now().Add(1*time.Minute))
  159. }
  160. } else if status, ok := data.(MirrorStatus); ok {
  161. logger.Noticef("Job %s status %s", status.Name, status.Status.String())
  162. jobRunning = status.Status == PreSyncing || status.Status == Syncing
  163. So(status.Status, ShouldNotEqual, Failed)
  164. lastStatus = status.Status
  165. }
  166. case <-time.After(2 * time.Second):
  167. So(url, ShouldNotEqual, "")
  168. So(jobRunning, ShouldBeFalse)
  169. So(lastStatus, ShouldEqual, Success)
  170. return
  171. }
  172. }
  173. }
  174. startWorkerThenStop(&workerCfg, dummyTester)
  175. })
  176. Convey("with several jobs", func(ctx C) {
  177. workerCfg.Mirrors = []mirrorConfig{
  178. {
  179. Name: "job-ls-1",
  180. Provider: provCommand,
  181. Command: "ls",
  182. },
  183. {
  184. Name: "job-fail",
  185. Provider: provCommand,
  186. Command: "non-existent-command-xxxx",
  187. },
  188. {
  189. Name: "job-ls-2",
  190. Provider: provCommand,
  191. Command: "ls",
  192. },
  193. }
  194. dummyTester := func(*Worker) {
  195. url := ""
  196. lastStatus := make(map[string]SyncStatus)
  197. nextSch := make(map[string]time.Time)
  198. for {
  199. select {
  200. case data := <-recvDataChan:
  201. if reg, ok := data.(WorkerStatus); ok {
  202. So(reg.ID, ShouldEqual, "dut")
  203. url = reg.URL
  204. time.Sleep(500 * time.Millisecond)
  205. sendCommandToWorker(url, httpClient, CmdStart, "job-fail")
  206. sendCommandToWorker(url, httpClient, CmdStart, "job-ls-1")
  207. sendCommandToWorker(url, httpClient, CmdStart, "job-ls-2")
  208. } else if sch, ok := data.(MirrorSchedules); ok {
  209. //So(len(sch.Schedules), ShouldEqual, 3)
  210. for _, item := range sch.Schedules {
  211. nextSch[item.MirrorName] = item.NextSchedule
  212. }
  213. } else if status, ok := data.(MirrorStatus); ok {
  214. logger.Noticef("Job %s status %s", status.Name, status.Status.String())
  215. jobRunning := status.Status == PreSyncing || status.Status == Syncing
  216. if !jobRunning {
  217. if status.Name == "job-fail" {
  218. So(status.Status, ShouldEqual, Failed)
  219. } else {
  220. So(status.Status, ShouldNotEqual, Failed)
  221. }
  222. }
  223. lastStatus[status.Name] = status.Status
  224. }
  225. case <-time.After(2 * time.Second):
  226. So(len(lastStatus), ShouldEqual, 3)
  227. So(len(nextSch), ShouldEqual, 3)
  228. return
  229. }
  230. }
  231. }
  232. startWorkerThenStop(&workerCfg, dummyTester)
  233. })
  234. })
  235. }