server_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "math/rand"
  7. "net/http"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. . "github.com/smartystreets/goconvey/convey"
  13. . "github.com/tuna/tunasync/internal"
  14. )
  15. const (
  16. _magicBadWorkerID = "magic_bad_worker_id"
  17. )
  18. func TestHTTPServer(t *testing.T) {
  19. var listenPort = 5000
  20. Convey("HTTP server should work", t, func(ctx C) {
  21. listenPort++
  22. port := listenPort
  23. addr := "127.0.0.1"
  24. baseURL := fmt.Sprintf("http://%s:%d", addr, port)
  25. InitLogger(true, true, false)
  26. s := GetTUNASyncManager(&Config{Debug: true})
  27. s.cfg.Server.Addr = addr
  28. s.cfg.Server.Port = port
  29. So(s, ShouldNotBeNil)
  30. s.setDBAdapter(&mockDBAdapter{
  31. workerStore: map[string]WorkerStatus{
  32. _magicBadWorkerID: WorkerStatus{
  33. ID: _magicBadWorkerID,
  34. }},
  35. statusStore: make(map[string]MirrorStatus),
  36. })
  37. go s.Run()
  38. time.Sleep(50 * time.Millisecond)
  39. resp, err := http.Get(baseURL + "/ping")
  40. So(err, ShouldBeNil)
  41. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  42. So(resp.Header.Get("Content-Type"), ShouldEqual, "application/json; charset=utf-8")
  43. defer resp.Body.Close()
  44. body, err := ioutil.ReadAll(resp.Body)
  45. So(err, ShouldBeNil)
  46. var p map[string]string
  47. err = json.Unmarshal(body, &p)
  48. So(err, ShouldBeNil)
  49. So(p[_infoKey], ShouldEqual, "pong")
  50. Convey("when database fail", func(ctx C) {
  51. resp, err := http.Get(fmt.Sprintf("%s/workers/%s/jobs", baseURL, _magicBadWorkerID))
  52. So(err, ShouldBeNil)
  53. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  54. defer resp.Body.Close()
  55. var msg map[string]string
  56. err = json.NewDecoder(resp.Body).Decode(&msg)
  57. So(err, ShouldBeNil)
  58. So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
  59. })
  60. Convey("when register a worker", func(ctx C) {
  61. w := WorkerStatus{
  62. ID: "test_worker1",
  63. }
  64. resp, err := PostJSON(baseURL+"/workers", w, nil)
  65. So(err, ShouldBeNil)
  66. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  67. Convey("list all workers", func(ctx C) {
  68. resp, err := http.Get(baseURL + "/workers")
  69. So(err, ShouldBeNil)
  70. defer resp.Body.Close()
  71. var actualResponseObj []WorkerStatus
  72. err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
  73. So(err, ShouldBeNil)
  74. So(len(actualResponseObj), ShouldEqual, 2)
  75. })
  76. Convey("delete an existent worker", func(ctx C) {
  77. req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil)
  78. So(err, ShouldBeNil)
  79. clt := &http.Client{}
  80. resp, err := clt.Do(req)
  81. So(err, ShouldBeNil)
  82. defer resp.Body.Close()
  83. res := map[string]string{}
  84. err = json.NewDecoder(resp.Body).Decode(&res)
  85. So(err, ShouldBeNil)
  86. So(res[_infoKey], ShouldEqual, "deleted")
  87. })
  88. Convey("delete non-existent worker", func(ctx C) {
  89. invalidWorker := "test_worker233"
  90. req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil)
  91. So(err, ShouldBeNil)
  92. clt := &http.Client{}
  93. resp, err := clt.Do(req)
  94. So(err, ShouldBeNil)
  95. defer resp.Body.Close()
  96. res := map[string]string{}
  97. err = json.NewDecoder(resp.Body).Decode(&res)
  98. So(err, ShouldBeNil)
  99. So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  100. })
  101. Convey("flush disabled jobs", func(ctx C) {
  102. req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
  103. So(err, ShouldBeNil)
  104. clt := &http.Client{}
  105. resp, err := clt.Do(req)
  106. So(err, ShouldBeNil)
  107. defer resp.Body.Close()
  108. res := map[string]string{}
  109. err = json.NewDecoder(resp.Body).Decode(&res)
  110. So(err, ShouldBeNil)
  111. So(res[_infoKey], ShouldEqual, "flushed")
  112. })
  113. Convey("update mirror status of a existed worker", func(ctx C) {
  114. status := MirrorStatus{
  115. Name: "arch-sync1",
  116. Worker: "test_worker1",
  117. IsMaster: true,
  118. Status: Success,
  119. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  120. Size: "unknown",
  121. }
  122. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  123. defer resp.Body.Close()
  124. So(err, ShouldBeNil)
  125. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  126. Convey("list mirror status of an existed worker", func(ctx C) {
  127. var ms []MirrorStatus
  128. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  129. So(err, ShouldBeNil)
  130. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  131. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  132. m := ms[0]
  133. So(m.Name, ShouldEqual, status.Name)
  134. So(m.Worker, ShouldEqual, status.Worker)
  135. So(m.Status, ShouldEqual, status.Status)
  136. So(m.Upstream, ShouldEqual, status.Upstream)
  137. So(m.Size, ShouldEqual, status.Size)
  138. So(m.IsMaster, ShouldEqual, status.IsMaster)
  139. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
  140. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  141. })
  142. Convey("list all job status of all workers", func(ctx C) {
  143. var ms []WebMirrorStatus
  144. resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
  145. So(err, ShouldBeNil)
  146. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  147. m := ms[0]
  148. So(m.Name, ShouldEqual, status.Name)
  149. So(m.Status, ShouldEqual, status.Status)
  150. So(m.Upstream, ShouldEqual, status.Upstream)
  151. So(m.Size, ShouldEqual, status.Size)
  152. So(m.IsMaster, ShouldEqual, status.IsMaster)
  153. So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
  154. So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 1*time.Second)
  155. })
  156. Convey("Update size of a valid mirror", func(ctx C) {
  157. msg := struct {
  158. Name string `json:"name"`
  159. Size string `json:"size"`
  160. }{status.Name, "5GB"}
  161. url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
  162. resp, err := PostJSON(url, msg, nil)
  163. So(err, ShouldBeNil)
  164. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  165. Convey("Get new size of a mirror", func(ctx C) {
  166. var ms []MirrorStatus
  167. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  168. So(err, ShouldBeNil)
  169. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  170. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  171. m := ms[0]
  172. So(m.Name, ShouldEqual, status.Name)
  173. So(m.Worker, ShouldEqual, status.Worker)
  174. So(m.Status, ShouldEqual, status.Status)
  175. So(m.Upstream, ShouldEqual, status.Upstream)
  176. So(m.Size, ShouldEqual, "5GB")
  177. So(m.IsMaster, ShouldEqual, status.IsMaster)
  178. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
  179. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  180. })
  181. })
  182. Convey("Update schedule of valid mirrors", func(ctx C) {
  183. msg := MirrorSchedules{
  184. []MirrorSchedule{
  185. MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)},
  186. MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)},
  187. },
  188. }
  189. url := fmt.Sprintf("%s/workers/%s/schedules", baseURL, status.Worker)
  190. resp, err := PostJSON(url, msg, nil)
  191. So(err, ShouldBeNil)
  192. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  193. })
  194. Convey("Update size of an invalid mirror", func(ctx C) {
  195. msg := struct {
  196. Name string `json:"name"`
  197. Size string `json:"size"`
  198. }{"Invalid mirror", "5GB"}
  199. url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
  200. resp, err := PostJSON(url, msg, nil)
  201. So(err, ShouldBeNil)
  202. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  203. })
  204. // what if status changed to failed
  205. status.Status = Failed
  206. time.Sleep(3 * time.Second)
  207. resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  208. defer resp.Body.Close()
  209. So(err, ShouldBeNil)
  210. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  211. Convey("What if syncing job failed", func(ctx C) {
  212. var ms []MirrorStatus
  213. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  214. So(err, ShouldBeNil)
  215. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  216. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  217. m := ms[0]
  218. So(m.Name, ShouldEqual, status.Name)
  219. So(m.Worker, ShouldEqual, status.Worker)
  220. So(m.Status, ShouldEqual, status.Status)
  221. So(m.Upstream, ShouldEqual, status.Upstream)
  222. So(m.Size, ShouldEqual, status.Size)
  223. So(m.IsMaster, ShouldEqual, status.IsMaster)
  224. So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
  225. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  226. })
  227. })
  228. Convey("update mirror status of an inexisted worker", func(ctx C) {
  229. invalidWorker := "test_worker2"
  230. status := MirrorStatus{
  231. Name: "arch-sync2",
  232. Worker: invalidWorker,
  233. IsMaster: true,
  234. Status: Success,
  235. LastUpdate: time.Now(),
  236. LastEnded: time.Now(),
  237. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  238. Size: "4GB",
  239. }
  240. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
  241. baseURL, status.Worker, status.Name), status, nil)
  242. So(err, ShouldBeNil)
  243. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  244. defer resp.Body.Close()
  245. var msg map[string]string
  246. err = json.NewDecoder(resp.Body).Decode(&msg)
  247. So(err, ShouldBeNil)
  248. So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  249. })
  250. Convey("update schedule of an non-existent worker", func(ctx C) {
  251. invalidWorker := "test_worker2"
  252. sch := MirrorSchedules{
  253. []MirrorSchedule{
  254. MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)},
  255. MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)},
  256. },
  257. }
  258. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/schedules",
  259. baseURL, invalidWorker), sch, nil)
  260. So(err, ShouldBeNil)
  261. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  262. defer resp.Body.Close()
  263. var msg map[string]string
  264. err = json.NewDecoder(resp.Body).Decode(&msg)
  265. So(err, ShouldBeNil)
  266. So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  267. })
  268. Convey("handle client command", func(ctx C) {
  269. cmdChan := make(chan WorkerCmd, 1)
  270. workerServer := makeMockWorkerServer(cmdChan)
  271. workerPort := rand.Intn(10000) + 30000
  272. bindAddress := fmt.Sprintf("127.0.0.1:%d", workerPort)
  273. workerBaseURL := fmt.Sprintf("http://%s", bindAddress)
  274. w := WorkerStatus{
  275. ID: "test_worker_cmd",
  276. URL: workerBaseURL + "/cmd",
  277. }
  278. resp, err := PostJSON(baseURL+"/workers", w, nil)
  279. So(err, ShouldBeNil)
  280. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  281. go func() {
  282. // run the mock worker server
  283. workerServer.Run(bindAddress)
  284. }()
  285. time.Sleep(50 * time.Microsecond)
  286. // verify the worker mock server is running
  287. workerResp, err := http.Get(workerBaseURL + "/ping")
  288. defer workerResp.Body.Close()
  289. So(err, ShouldBeNil)
  290. So(workerResp.StatusCode, ShouldEqual, http.StatusOK)
  291. Convey("when client send wrong cmd", func(ctx C) {
  292. clientCmd := ClientCmd{
  293. Cmd: CmdStart,
  294. MirrorID: "ubuntu-sync",
  295. WorkerID: "not_exist_worker",
  296. }
  297. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  298. defer resp.Body.Close()
  299. So(err, ShouldBeNil)
  300. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  301. })
  302. Convey("when client send correct cmd", func(ctx C) {
  303. clientCmd := ClientCmd{
  304. Cmd: CmdStart,
  305. MirrorID: "ubuntu-sync",
  306. WorkerID: w.ID,
  307. }
  308. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  309. defer resp.Body.Close()
  310. So(err, ShouldBeNil)
  311. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  312. time.Sleep(50 * time.Microsecond)
  313. select {
  314. case cmd := <-cmdChan:
  315. ctx.So(cmd.Cmd, ShouldEqual, clientCmd.Cmd)
  316. ctx.So(cmd.MirrorID, ShouldEqual, clientCmd.MirrorID)
  317. default:
  318. ctx.So(0, ShouldEqual, 1)
  319. }
  320. })
  321. })
  322. })
  323. })
  324. }
  325. type mockDBAdapter struct {
  326. workerStore map[string]WorkerStatus
  327. statusStore map[string]MirrorStatus
  328. }
  329. func (b *mockDBAdapter) Init() error {
  330. return nil
  331. }
  332. func (b *mockDBAdapter) ListWorkers() ([]WorkerStatus, error) {
  333. workers := make([]WorkerStatus, len(b.workerStore))
  334. idx := 0
  335. for _, w := range b.workerStore {
  336. workers[idx] = w
  337. idx++
  338. }
  339. return workers, nil
  340. }
  341. func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
  342. w, ok := b.workerStore[workerID]
  343. if !ok {
  344. return WorkerStatus{}, fmt.Errorf("invalid workerId")
  345. }
  346. return w, nil
  347. }
  348. func (b *mockDBAdapter) DeleteWorker(workerID string) error {
  349. delete(b.workerStore, workerID)
  350. return nil
  351. }
  352. func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  353. // _, ok := b.workerStore[w.ID]
  354. // if ok {
  355. // return workerStatus{}, fmt.Errorf("duplicate worker name")
  356. // }
  357. b.workerStore[w.ID] = w
  358. return w, nil
  359. }
  360. func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
  361. id := mirrorID + "/" + workerID
  362. status, ok := b.statusStore[id]
  363. if !ok {
  364. return MirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
  365. }
  366. return status, nil
  367. }
  368. func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  369. // if _, ok := b.workerStore[workerID]; !ok {
  370. // // unregistered worker
  371. // return MirrorStatus{}, fmt.Errorf("invalid workerID %s", workerID)
  372. // }
  373. id := mirrorID + "/" + workerID
  374. b.statusStore[id] = status
  375. return status, nil
  376. }
  377. func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]MirrorStatus, error) {
  378. var mirrorStatusList []MirrorStatus
  379. // simulating a database fail
  380. if workerID == _magicBadWorkerID {
  381. return []MirrorStatus{}, fmt.Errorf("database fail")
  382. }
  383. for k, v := range b.statusStore {
  384. if wID := strings.Split(k, "/")[1]; wID == workerID {
  385. mirrorStatusList = append(mirrorStatusList, v)
  386. }
  387. }
  388. return mirrorStatusList, nil
  389. }
  390. func (b *mockDBAdapter) ListAllMirrorStatus() ([]MirrorStatus, error) {
  391. var mirrorStatusList []MirrorStatus
  392. for _, v := range b.statusStore {
  393. mirrorStatusList = append(mirrorStatusList, v)
  394. }
  395. return mirrorStatusList, nil
  396. }
  397. func (b *mockDBAdapter) Close() error {
  398. return nil
  399. }
  400. func (b *mockDBAdapter) FlushDisabledJobs() error {
  401. return nil
  402. }
  403. func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine {
  404. r := gin.Default()
  405. r.GET("/ping", func(c *gin.Context) {
  406. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  407. })
  408. r.POST("/cmd", func(c *gin.Context) {
  409. var cmd WorkerCmd
  410. c.BindJSON(&cmd)
  411. cmdChan <- cmd
  412. })
  413. return r
  414. }