server_test.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "math/rand"
  7. "net/http"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. . "github.com/smartystreets/goconvey/convey"
  13. . "github.com/tuna/tunasync/internal"
  14. )
  15. const (
  16. _magicBadWorkerID = "magic_bad_worker_id"
  17. )
  18. func TestHTTPServer(t *testing.T) {
  19. Convey("HTTP server should work", t, func(ctx C) {
  20. InitLogger(true, true, false)
  21. s := GetTUNASyncManager(&Config{Debug: false})
  22. So(s, ShouldNotBeNil)
  23. s.setDBAdapter(&mockDBAdapter{
  24. workerStore: map[string]WorkerStatus{
  25. _magicBadWorkerID: WorkerStatus{
  26. ID: _magicBadWorkerID,
  27. }},
  28. statusStore: make(map[string]MirrorStatus),
  29. })
  30. port := rand.Intn(10000) + 20000
  31. baseURL := fmt.Sprintf("http://127.0.0.1:%d", port)
  32. go func() {
  33. s.engine.Run(fmt.Sprintf("127.0.0.1:%d", port))
  34. }()
  35. time.Sleep(50 * time.Microsecond)
  36. resp, err := http.Get(baseURL + "/ping")
  37. So(err, ShouldBeNil)
  38. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  39. So(resp.Header.Get("Content-Type"), ShouldEqual, "application/json; charset=utf-8")
  40. defer resp.Body.Close()
  41. body, err := ioutil.ReadAll(resp.Body)
  42. So(err, ShouldBeNil)
  43. var p map[string]string
  44. err = json.Unmarshal(body, &p)
  45. So(err, ShouldBeNil)
  46. So(p[_infoKey], ShouldEqual, "pong")
  47. Convey("when database fail", func(ctx C) {
  48. resp, err := http.Get(fmt.Sprintf("%s/workers/%s/jobs", baseURL, _magicBadWorkerID))
  49. So(err, ShouldBeNil)
  50. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  51. defer resp.Body.Close()
  52. var msg map[string]string
  53. err = json.NewDecoder(resp.Body).Decode(&msg)
  54. So(err, ShouldBeNil)
  55. So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
  56. })
  57. Convey("when register a worker", func(ctx C) {
  58. w := WorkerStatus{
  59. ID: "test_worker1",
  60. }
  61. resp, err := PostJSON(baseURL+"/workers", w, nil)
  62. So(err, ShouldBeNil)
  63. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  64. Convey("list all workers", func(ctx C) {
  65. So(err, ShouldBeNil)
  66. resp, err := http.Get(baseURL + "/workers")
  67. So(err, ShouldBeNil)
  68. defer resp.Body.Close()
  69. var actualResponseObj []WorkerStatus
  70. err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
  71. So(err, ShouldBeNil)
  72. So(len(actualResponseObj), ShouldEqual, 2)
  73. })
  74. Convey("update mirror status of a existed worker", func(ctx C) {
  75. status := MirrorStatus{
  76. Name: "arch-sync1",
  77. Worker: "test_worker1",
  78. IsMaster: true,
  79. Status: Success,
  80. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  81. Size: "3GB",
  82. }
  83. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  84. defer resp.Body.Close()
  85. So(err, ShouldBeNil)
  86. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  87. Convey("list mirror status of an existed worker", func(ctx C) {
  88. var ms []MirrorStatus
  89. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  90. So(err, ShouldBeNil)
  91. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  92. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  93. m := ms[0]
  94. So(m.Name, ShouldEqual, status.Name)
  95. So(m.Worker, ShouldEqual, status.Worker)
  96. So(m.Status, ShouldEqual, status.Status)
  97. So(m.Upstream, ShouldEqual, status.Upstream)
  98. So(m.Size, ShouldEqual, status.Size)
  99. So(m.IsMaster, ShouldEqual, status.IsMaster)
  100. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
  101. })
  102. Convey("list all job status of all workers", func(ctx C) {
  103. var ms []webMirrorStatus
  104. resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
  105. So(err, ShouldBeNil)
  106. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  107. m := ms[0]
  108. So(m.Name, ShouldEqual, status.Name)
  109. So(m.Status, ShouldEqual, status.Status)
  110. So(m.Upstream, ShouldEqual, status.Upstream)
  111. So(m.Size, ShouldEqual, status.Size)
  112. So(m.IsMaster, ShouldEqual, status.IsMaster)
  113. So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
  114. })
  115. })
  116. Convey("update mirror status of an inexisted worker", func(ctx C) {
  117. invalidWorker := "test_worker2"
  118. status := MirrorStatus{
  119. Name: "arch-sync2",
  120. Worker: invalidWorker,
  121. IsMaster: true,
  122. Status: Success,
  123. LastUpdate: time.Now(),
  124. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  125. Size: "4GB",
  126. }
  127. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
  128. baseURL, status.Worker, status.Name), status, nil)
  129. So(err, ShouldBeNil)
  130. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  131. defer resp.Body.Close()
  132. var msg map[string]string
  133. err = json.NewDecoder(resp.Body).Decode(&msg)
  134. So(err, ShouldBeNil)
  135. So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  136. })
  137. Convey("handle client command", func(ctx C) {
  138. cmdChan := make(chan WorkerCmd, 1)
  139. workerServer := makeMockWorkerServer(cmdChan)
  140. workerPort := rand.Intn(10000) + 30000
  141. bindAddress := fmt.Sprintf("127.0.0.1:%d", workerPort)
  142. workerBaseURL := fmt.Sprintf("http://%s", bindAddress)
  143. w := WorkerStatus{
  144. ID: "test_worker_cmd",
  145. URL: workerBaseURL + "/cmd",
  146. }
  147. resp, err := PostJSON(baseURL+"/workers", w, nil)
  148. So(err, ShouldBeNil)
  149. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  150. go func() {
  151. // run the mock worker server
  152. workerServer.Run(bindAddress)
  153. }()
  154. time.Sleep(50 * time.Microsecond)
  155. // verify the worker mock server is running
  156. workerResp, err := http.Get(workerBaseURL + "/ping")
  157. defer workerResp.Body.Close()
  158. So(err, ShouldBeNil)
  159. So(workerResp.StatusCode, ShouldEqual, http.StatusOK)
  160. Convey("when client send wrong cmd", func(ctx C) {
  161. clientCmd := ClientCmd{
  162. Cmd: CmdStart,
  163. MirrorID: "ubuntu-sync",
  164. WorkerID: "not_exist_worker",
  165. }
  166. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  167. defer resp.Body.Close()
  168. So(err, ShouldBeNil)
  169. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  170. })
  171. Convey("when client send correct cmd", func(ctx C) {
  172. clientCmd := ClientCmd{
  173. Cmd: CmdStart,
  174. MirrorID: "ubuntu-sync",
  175. WorkerID: w.ID,
  176. }
  177. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  178. defer resp.Body.Close()
  179. So(err, ShouldBeNil)
  180. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  181. time.Sleep(50 * time.Microsecond)
  182. select {
  183. case cmd := <-cmdChan:
  184. ctx.So(cmd.Cmd, ShouldEqual, clientCmd.Cmd)
  185. ctx.So(cmd.MirrorID, ShouldEqual, clientCmd.MirrorID)
  186. default:
  187. ctx.So(0, ShouldEqual, 1)
  188. }
  189. })
  190. })
  191. })
  192. })
  193. }
  194. type mockDBAdapter struct {
  195. workerStore map[string]WorkerStatus
  196. statusStore map[string]MirrorStatus
  197. }
  198. func (b *mockDBAdapter) Init() error {
  199. return nil
  200. }
  201. func (b *mockDBAdapter) ListWorkers() ([]WorkerStatus, error) {
  202. workers := make([]WorkerStatus, len(b.workerStore))
  203. idx := 0
  204. for _, w := range b.workerStore {
  205. workers[idx] = w
  206. idx++
  207. }
  208. return workers, nil
  209. }
  210. func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
  211. w, ok := b.workerStore[workerID]
  212. if !ok {
  213. return WorkerStatus{}, fmt.Errorf("invalid workerId")
  214. }
  215. return w, nil
  216. }
  217. func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  218. // _, ok := b.workerStore[w.ID]
  219. // if ok {
  220. // return workerStatus{}, fmt.Errorf("duplicate worker name")
  221. // }
  222. b.workerStore[w.ID] = w
  223. return w, nil
  224. }
  225. func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
  226. id := mirrorID + "/" + workerID
  227. status, ok := b.statusStore[id]
  228. if !ok {
  229. return MirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
  230. }
  231. return status, nil
  232. }
  233. func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  234. // if _, ok := b.workerStore[workerID]; !ok {
  235. // // unregistered worker
  236. // return MirrorStatus{}, fmt.Errorf("invalid workerID %s", workerID)
  237. // }
  238. id := mirrorID + "/" + workerID
  239. b.statusStore[id] = status
  240. return status, nil
  241. }
  242. func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]MirrorStatus, error) {
  243. var mirrorStatusList []MirrorStatus
  244. // simulating a database fail
  245. if workerID == _magicBadWorkerID {
  246. return []MirrorStatus{}, fmt.Errorf("database fail")
  247. }
  248. for k, v := range b.statusStore {
  249. if wID := strings.Split(k, "/")[1]; wID == workerID {
  250. mirrorStatusList = append(mirrorStatusList, v)
  251. }
  252. }
  253. return mirrorStatusList, nil
  254. }
  255. func (b *mockDBAdapter) ListAllMirrorStatus() ([]MirrorStatus, error) {
  256. var mirrorStatusList []MirrorStatus
  257. for _, v := range b.statusStore {
  258. mirrorStatusList = append(mirrorStatusList, v)
  259. }
  260. return mirrorStatusList, nil
  261. }
  262. func (b *mockDBAdapter) Close() error {
  263. return nil
  264. }
  265. func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine {
  266. r := gin.Default()
  267. r.GET("/ping", func(c *gin.Context) {
  268. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  269. })
  270. r.POST("/cmd", func(c *gin.Context) {
  271. var cmd WorkerCmd
  272. c.BindJSON(&cmd)
  273. cmdChan <- cmd
  274. })
  275. return r
  276. }