server_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "math/rand"
  7. "net/http"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. . "github.com/smartystreets/goconvey/convey"
  13. . "github.com/tuna/tunasync/internal"
  14. )
  15. const (
  16. _magicBadWorkerID = "magic_bad_worker_id"
  17. )
  18. func TestHTTPServer(t *testing.T) {
  19. Convey("HTTP server should work", t, func(ctx C) {
  20. InitLogger(true, true, false)
  21. s := GetTUNASyncManager(&Config{Debug: false})
  22. So(s, ShouldNotBeNil)
  23. s.setDBAdapter(&mockDBAdapter{
  24. workerStore: map[string]WorkerStatus{
  25. _magicBadWorkerID: WorkerStatus{
  26. ID: _magicBadWorkerID,
  27. }},
  28. statusStore: make(map[string]MirrorStatus),
  29. })
  30. port := rand.Intn(10000) + 20000
  31. baseURL := fmt.Sprintf("http://127.0.0.1:%d", port)
  32. go func() {
  33. s.engine.Run(fmt.Sprintf("127.0.0.1:%d", port))
  34. }()
  35. time.Sleep(50 * time.Microsecond)
  36. resp, err := http.Get(baseURL + "/ping")
  37. So(err, ShouldBeNil)
  38. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  39. So(resp.Header.Get("Content-Type"), ShouldEqual, "application/json; charset=utf-8")
  40. defer resp.Body.Close()
  41. body, err := ioutil.ReadAll(resp.Body)
  42. So(err, ShouldBeNil)
  43. var p map[string]string
  44. err = json.Unmarshal(body, &p)
  45. So(err, ShouldBeNil)
  46. So(p[_infoKey], ShouldEqual, "pong")
  47. Convey("when database fail", func(ctx C) {
  48. resp, err := http.Get(fmt.Sprintf("%s/workers/%s/jobs", baseURL, _magicBadWorkerID))
  49. So(err, ShouldBeNil)
  50. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  51. defer resp.Body.Close()
  52. var msg map[string]string
  53. err = json.NewDecoder(resp.Body).Decode(&msg)
  54. So(err, ShouldBeNil)
  55. So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
  56. })
  57. Convey("when register a worker", func(ctx C) {
  58. w := WorkerStatus{
  59. ID: "test_worker1",
  60. }
  61. resp, err := PostJSON(baseURL+"/workers", w, nil)
  62. So(err, ShouldBeNil)
  63. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  64. Convey("list all workers", func(ctx C) {
  65. resp, err := http.Get(baseURL + "/workers")
  66. So(err, ShouldBeNil)
  67. defer resp.Body.Close()
  68. var actualResponseObj []WorkerStatus
  69. err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
  70. So(err, ShouldBeNil)
  71. So(len(actualResponseObj), ShouldEqual, 2)
  72. })
  73. Convey("flush disabled jobs", func(ctx C) {
  74. req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
  75. So(err, ShouldBeNil)
  76. clt := &http.Client{}
  77. resp, err := clt.Do(req)
  78. So(err, ShouldBeNil)
  79. defer resp.Body.Close()
  80. res := map[string]string{}
  81. err = json.NewDecoder(resp.Body).Decode(&res)
  82. So(err, ShouldBeNil)
  83. So(res[_infoKey], ShouldEqual, "flushed")
  84. })
  85. Convey("update mirror status of a existed worker", func(ctx C) {
  86. status := MirrorStatus{
  87. Name: "arch-sync1",
  88. Worker: "test_worker1",
  89. IsMaster: true,
  90. Status: Success,
  91. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  92. Size: "unknown",
  93. }
  94. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  95. defer resp.Body.Close()
  96. So(err, ShouldBeNil)
  97. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  98. Convey("list mirror status of an existed worker", func(ctx C) {
  99. var ms []MirrorStatus
  100. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  101. So(err, ShouldBeNil)
  102. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  103. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  104. m := ms[0]
  105. So(m.Name, ShouldEqual, status.Name)
  106. So(m.Worker, ShouldEqual, status.Worker)
  107. So(m.Status, ShouldEqual, status.Status)
  108. So(m.Upstream, ShouldEqual, status.Upstream)
  109. So(m.Size, ShouldEqual, status.Size)
  110. So(m.IsMaster, ShouldEqual, status.IsMaster)
  111. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
  112. })
  113. Convey("list all job status of all workers", func(ctx C) {
  114. var ms []webMirrorStatus
  115. resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
  116. So(err, ShouldBeNil)
  117. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  118. m := ms[0]
  119. So(m.Name, ShouldEqual, status.Name)
  120. So(m.Status, ShouldEqual, status.Status)
  121. So(m.Upstream, ShouldEqual, status.Upstream)
  122. So(m.Size, ShouldEqual, status.Size)
  123. So(m.IsMaster, ShouldEqual, status.IsMaster)
  124. So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
  125. })
  126. Convey("Update size of a valid mirror", func(ctx C) {
  127. msg := struct {
  128. Name string `json:"name"`
  129. Size string `json:"size"`
  130. }{status.Name, "5GB"}
  131. url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
  132. resp, err := PostJSON(url, msg, nil)
  133. So(err, ShouldBeNil)
  134. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  135. Convey("Get new size of a mirror", func(ctx C) {
  136. var ms []MirrorStatus
  137. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  138. So(err, ShouldBeNil)
  139. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  140. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  141. m := ms[0]
  142. So(m.Name, ShouldEqual, status.Name)
  143. So(m.Worker, ShouldEqual, status.Worker)
  144. So(m.Status, ShouldEqual, status.Status)
  145. So(m.Upstream, ShouldEqual, status.Upstream)
  146. So(m.Size, ShouldEqual, "5GB")
  147. So(m.IsMaster, ShouldEqual, status.IsMaster)
  148. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
  149. })
  150. })
  151. Convey("Update size of an invalid mirror", func(ctx C) {
  152. msg := struct {
  153. Name string `json:"name"`
  154. Size string `json:"size"`
  155. }{"Invalid mirror", "5GB"}
  156. url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
  157. resp, err := PostJSON(url, msg, nil)
  158. So(err, ShouldBeNil)
  159. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  160. })
  161. })
  162. Convey("update mirror status of an inexisted worker", func(ctx C) {
  163. invalidWorker := "test_worker2"
  164. status := MirrorStatus{
  165. Name: "arch-sync2",
  166. Worker: invalidWorker,
  167. IsMaster: true,
  168. Status: Success,
  169. LastUpdate: time.Now(),
  170. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  171. Size: "4GB",
  172. }
  173. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
  174. baseURL, status.Worker, status.Name), status, nil)
  175. So(err, ShouldBeNil)
  176. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  177. defer resp.Body.Close()
  178. var msg map[string]string
  179. err = json.NewDecoder(resp.Body).Decode(&msg)
  180. So(err, ShouldBeNil)
  181. So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  182. })
  183. Convey("handle client command", func(ctx C) {
  184. cmdChan := make(chan WorkerCmd, 1)
  185. workerServer := makeMockWorkerServer(cmdChan)
  186. workerPort := rand.Intn(10000) + 30000
  187. bindAddress := fmt.Sprintf("127.0.0.1:%d", workerPort)
  188. workerBaseURL := fmt.Sprintf("http://%s", bindAddress)
  189. w := WorkerStatus{
  190. ID: "test_worker_cmd",
  191. URL: workerBaseURL + "/cmd",
  192. }
  193. resp, err := PostJSON(baseURL+"/workers", w, nil)
  194. So(err, ShouldBeNil)
  195. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  196. go func() {
  197. // run the mock worker server
  198. workerServer.Run(bindAddress)
  199. }()
  200. time.Sleep(50 * time.Microsecond)
  201. // verify the worker mock server is running
  202. workerResp, err := http.Get(workerBaseURL + "/ping")
  203. defer workerResp.Body.Close()
  204. So(err, ShouldBeNil)
  205. So(workerResp.StatusCode, ShouldEqual, http.StatusOK)
  206. Convey("when client send wrong cmd", func(ctx C) {
  207. clientCmd := ClientCmd{
  208. Cmd: CmdStart,
  209. MirrorID: "ubuntu-sync",
  210. WorkerID: "not_exist_worker",
  211. }
  212. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  213. defer resp.Body.Close()
  214. So(err, ShouldBeNil)
  215. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  216. })
  217. Convey("when client send correct cmd", func(ctx C) {
  218. clientCmd := ClientCmd{
  219. Cmd: CmdStart,
  220. MirrorID: "ubuntu-sync",
  221. WorkerID: w.ID,
  222. }
  223. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  224. defer resp.Body.Close()
  225. So(err, ShouldBeNil)
  226. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  227. time.Sleep(50 * time.Microsecond)
  228. select {
  229. case cmd := <-cmdChan:
  230. ctx.So(cmd.Cmd, ShouldEqual, clientCmd.Cmd)
  231. ctx.So(cmd.MirrorID, ShouldEqual, clientCmd.MirrorID)
  232. default:
  233. ctx.So(0, ShouldEqual, 1)
  234. }
  235. })
  236. })
  237. })
  238. })
  239. }
  240. type mockDBAdapter struct {
  241. workerStore map[string]WorkerStatus
  242. statusStore map[string]MirrorStatus
  243. }
  244. func (b *mockDBAdapter) Init() error {
  245. return nil
  246. }
  247. func (b *mockDBAdapter) ListWorkers() ([]WorkerStatus, error) {
  248. workers := make([]WorkerStatus, len(b.workerStore))
  249. idx := 0
  250. for _, w := range b.workerStore {
  251. workers[idx] = w
  252. idx++
  253. }
  254. return workers, nil
  255. }
  256. func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
  257. w, ok := b.workerStore[workerID]
  258. if !ok {
  259. return WorkerStatus{}, fmt.Errorf("invalid workerId")
  260. }
  261. return w, nil
  262. }
  263. func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  264. // _, ok := b.workerStore[w.ID]
  265. // if ok {
  266. // return workerStatus{}, fmt.Errorf("duplicate worker name")
  267. // }
  268. b.workerStore[w.ID] = w
  269. return w, nil
  270. }
  271. func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
  272. id := mirrorID + "/" + workerID
  273. status, ok := b.statusStore[id]
  274. if !ok {
  275. return MirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
  276. }
  277. return status, nil
  278. }
  279. func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  280. // if _, ok := b.workerStore[workerID]; !ok {
  281. // // unregistered worker
  282. // return MirrorStatus{}, fmt.Errorf("invalid workerID %s", workerID)
  283. // }
  284. id := mirrorID + "/" + workerID
  285. b.statusStore[id] = status
  286. return status, nil
  287. }
  288. func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]MirrorStatus, error) {
  289. var mirrorStatusList []MirrorStatus
  290. // simulating a database fail
  291. if workerID == _magicBadWorkerID {
  292. return []MirrorStatus{}, fmt.Errorf("database fail")
  293. }
  294. for k, v := range b.statusStore {
  295. if wID := strings.Split(k, "/")[1]; wID == workerID {
  296. mirrorStatusList = append(mirrorStatusList, v)
  297. }
  298. }
  299. return mirrorStatusList, nil
  300. }
  301. func (b *mockDBAdapter) ListAllMirrorStatus() ([]MirrorStatus, error) {
  302. var mirrorStatusList []MirrorStatus
  303. for _, v := range b.statusStore {
  304. mirrorStatusList = append(mirrorStatusList, v)
  305. }
  306. return mirrorStatusList, nil
  307. }
  308. func (b *mockDBAdapter) Close() error {
  309. return nil
  310. }
  311. func (b *mockDBAdapter) FlushDisabledJobs() error {
  312. return nil
  313. }
  314. func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine {
  315. r := gin.Default()
  316. r.GET("/ping", func(c *gin.Context) {
  317. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  318. })
  319. r.POST("/cmd", func(c *gin.Context) {
  320. var cmd WorkerCmd
  321. c.BindJSON(&cmd)
  322. cmdChan <- cmd
  323. })
  324. return r
  325. }