server_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "math/rand"
  7. "net/http"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. . "github.com/smartystreets/goconvey/convey"
  13. . "github.com/tuna/tunasync/internal"
  14. )
  15. const (
  16. _magicBadWorkerID = "magic_bad_worker_id"
  17. )
  18. func TestHTTPServer(t *testing.T) {
  19. var listenPort = 5000
  20. Convey("HTTP server should work", t, func(ctx C) {
  21. listenPort++
  22. port := listenPort
  23. addr := "127.0.0.1"
  24. baseURL := fmt.Sprintf("http://%s:%d", addr, port)
  25. InitLogger(true, true, false)
  26. s := GetTUNASyncManager(&Config{Debug: true})
  27. s.cfg.Server.Addr = addr
  28. s.cfg.Server.Port = port
  29. So(s, ShouldNotBeNil)
  30. s.setDBAdapter(&mockDBAdapter{
  31. workerStore: map[string]WorkerStatus{
  32. _magicBadWorkerID: WorkerStatus{
  33. ID: _magicBadWorkerID,
  34. }},
  35. statusStore: make(map[string]MirrorStatus),
  36. })
  37. go s.Run()
  38. time.Sleep(50 * time.Millisecond)
  39. resp, err := http.Get(baseURL + "/ping")
  40. So(err, ShouldBeNil)
  41. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  42. So(resp.Header.Get("Content-Type"), ShouldEqual, "application/json; charset=utf-8")
  43. defer resp.Body.Close()
  44. body, err := ioutil.ReadAll(resp.Body)
  45. So(err, ShouldBeNil)
  46. var p map[string]string
  47. err = json.Unmarshal(body, &p)
  48. So(err, ShouldBeNil)
  49. So(p[_infoKey], ShouldEqual, "pong")
  50. Convey("when database fail", func(ctx C) {
  51. resp, err := http.Get(fmt.Sprintf("%s/workers/%s/jobs", baseURL, _magicBadWorkerID))
  52. So(err, ShouldBeNil)
  53. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  54. defer resp.Body.Close()
  55. var msg map[string]string
  56. err = json.NewDecoder(resp.Body).Decode(&msg)
  57. So(err, ShouldBeNil)
  58. So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
  59. })
  60. Convey("when register a worker", func(ctx C) {
  61. w := WorkerStatus{
  62. ID: "test_worker1",
  63. }
  64. resp, err := PostJSON(baseURL+"/workers", w, nil)
  65. So(err, ShouldBeNil)
  66. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  67. Convey("list all workers", func(ctx C) {
  68. resp, err := http.Get(baseURL + "/workers")
  69. So(err, ShouldBeNil)
  70. defer resp.Body.Close()
  71. var actualResponseObj []WorkerStatus
  72. err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
  73. So(err, ShouldBeNil)
  74. So(len(actualResponseObj), ShouldEqual, 2)
  75. })
  76. Convey("delete an existent worker", func(ctx C) {
  77. req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil)
  78. So(err, ShouldBeNil)
  79. clt := &http.Client{}
  80. resp, err := clt.Do(req)
  81. So(err, ShouldBeNil)
  82. defer resp.Body.Close()
  83. res := map[string]string{}
  84. err = json.NewDecoder(resp.Body).Decode(&res)
  85. So(err, ShouldBeNil)
  86. So(res[_infoKey], ShouldEqual, "deleted")
  87. })
  88. Convey("delete non-existent worker", func(ctx C) {
  89. invalidWorker := "test_worker233"
  90. req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil)
  91. So(err, ShouldBeNil)
  92. clt := &http.Client{}
  93. resp, err := clt.Do(req)
  94. So(err, ShouldBeNil)
  95. defer resp.Body.Close()
  96. res := map[string]string{}
  97. err = json.NewDecoder(resp.Body).Decode(&res)
  98. So(err, ShouldBeNil)
  99. So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  100. })
  101. Convey("flush disabled jobs", func(ctx C) {
  102. req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
  103. So(err, ShouldBeNil)
  104. clt := &http.Client{}
  105. resp, err := clt.Do(req)
  106. So(err, ShouldBeNil)
  107. defer resp.Body.Close()
  108. res := map[string]string{}
  109. err = json.NewDecoder(resp.Body).Decode(&res)
  110. So(err, ShouldBeNil)
  111. So(res[_infoKey], ShouldEqual, "flushed")
  112. })
  113. Convey("update mirror status of a existed worker", func(ctx C) {
  114. status := MirrorStatus{
  115. Name: "arch-sync1",
  116. Worker: "test_worker1",
  117. IsMaster: true,
  118. Status: Success,
  119. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  120. Size: "unknown",
  121. }
  122. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  123. So(err, ShouldBeNil)
  124. defer resp.Body.Close()
  125. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  126. Convey("list mirror status of an existed worker", func(ctx C) {
  127. var ms []MirrorStatus
  128. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  129. So(err, ShouldBeNil)
  130. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  131. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  132. m := ms[0]
  133. So(m.Name, ShouldEqual, status.Name)
  134. So(m.Worker, ShouldEqual, status.Worker)
  135. So(m.Status, ShouldEqual, status.Status)
  136. So(m.Upstream, ShouldEqual, status.Upstream)
  137. So(m.Size, ShouldEqual, status.Size)
  138. So(m.IsMaster, ShouldEqual, status.IsMaster)
  139. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
  140. So(m.LastStarted.IsZero(), ShouldBeTrue) // hasn't been initialized yet
  141. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  142. })
  143. // start syncing
  144. status.Status = PreSyncing
  145. time.Sleep(1 * time.Second)
  146. resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  147. So(err, ShouldBeNil)
  148. defer resp.Body.Close()
  149. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  150. Convey("update mirror status to PreSync - starting sync", func(ctx C) {
  151. var ms []MirrorStatus
  152. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  153. So(err, ShouldBeNil)
  154. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  155. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  156. m := ms[0]
  157. So(m.Name, ShouldEqual, status.Name)
  158. So(m.Worker, ShouldEqual, status.Worker)
  159. So(m.Status, ShouldEqual, status.Status)
  160. So(m.Upstream, ShouldEqual, status.Upstream)
  161. So(m.Size, ShouldEqual, status.Size)
  162. So(m.IsMaster, ShouldEqual, status.IsMaster)
  163. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 3*time.Second)
  164. So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 1*time.Second)
  165. So(time.Now().Sub(m.LastStarted), ShouldBeLessThan, 2*time.Second)
  166. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 3*time.Second)
  167. So(time.Now().Sub(m.LastEnded), ShouldBeGreaterThan, 1*time.Second)
  168. })
  169. Convey("list all job status of all workers", func(ctx C) {
  170. var ms []WebMirrorStatus
  171. resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
  172. So(err, ShouldBeNil)
  173. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  174. m := ms[0]
  175. So(m.Name, ShouldEqual, status.Name)
  176. So(m.Status, ShouldEqual, status.Status)
  177. So(m.Upstream, ShouldEqual, status.Upstream)
  178. So(m.Size, ShouldEqual, status.Size)
  179. So(m.IsMaster, ShouldEqual, status.IsMaster)
  180. So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 3*time.Second)
  181. So(time.Now().Sub(m.LastStarted.Time), ShouldBeLessThan, 2*time.Second)
  182. So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 3*time.Second)
  183. })
  184. Convey("Update size of a valid mirror", func(ctx C) {
  185. msg := struct {
  186. Name string `json:"name"`
  187. Size string `json:"size"`
  188. }{status.Name, "5GB"}
  189. url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
  190. resp, err := PostJSON(url, msg, nil)
  191. So(err, ShouldBeNil)
  192. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  193. Convey("Get new size of a mirror", func(ctx C) {
  194. var ms []MirrorStatus
  195. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  196. So(err, ShouldBeNil)
  197. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  198. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  199. m := ms[0]
  200. So(m.Name, ShouldEqual, status.Name)
  201. So(m.Worker, ShouldEqual, status.Worker)
  202. So(m.Status, ShouldEqual, status.Status)
  203. So(m.Upstream, ShouldEqual, status.Upstream)
  204. So(m.Size, ShouldEqual, "5GB")
  205. So(m.IsMaster, ShouldEqual, status.IsMaster)
  206. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 3*time.Second)
  207. So(time.Now().Sub(m.LastStarted), ShouldBeLessThan, 2*time.Second)
  208. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 3*time.Second)
  209. })
  210. })
  211. Convey("Update schedule of valid mirrors", func(ctx C) {
  212. msg := MirrorSchedules{
  213. []MirrorSchedule{
  214. MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)},
  215. MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)},
  216. },
  217. }
  218. url := fmt.Sprintf("%s/workers/%s/schedules", baseURL, status.Worker)
  219. resp, err := PostJSON(url, msg, nil)
  220. So(err, ShouldBeNil)
  221. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  222. })
  223. Convey("Update size of an invalid mirror", func(ctx C) {
  224. msg := struct {
  225. Name string `json:"name"`
  226. Size string `json:"size"`
  227. }{"Invalid mirror", "5GB"}
  228. url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
  229. resp, err := PostJSON(url, msg, nil)
  230. So(err, ShouldBeNil)
  231. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  232. })
  233. // what if status changed to failed
  234. status.Status = Failed
  235. time.Sleep(3 * time.Second)
  236. resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  237. So(err, ShouldBeNil)
  238. defer resp.Body.Close()
  239. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  240. Convey("What if syncing job failed", func(ctx C) {
  241. var ms []MirrorStatus
  242. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  243. So(err, ShouldBeNil)
  244. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  245. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  246. m := ms[0]
  247. So(m.Name, ShouldEqual, status.Name)
  248. So(m.Worker, ShouldEqual, status.Worker)
  249. So(m.Status, ShouldEqual, status.Status)
  250. So(m.Upstream, ShouldEqual, status.Upstream)
  251. So(m.Size, ShouldEqual, status.Size)
  252. So(m.IsMaster, ShouldEqual, status.IsMaster)
  253. So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
  254. So(time.Now().Sub(m.LastStarted), ShouldBeGreaterThan, 3*time.Second)
  255. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  256. })
  257. })
  258. Convey("update mirror status of an inexisted worker", func(ctx C) {
  259. invalidWorker := "test_worker2"
  260. status := MirrorStatus{
  261. Name: "arch-sync2",
  262. Worker: invalidWorker,
  263. IsMaster: true,
  264. Status: Success,
  265. LastUpdate: time.Now(),
  266. LastStarted: time.Now(),
  267. LastEnded: time.Now(),
  268. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  269. Size: "4GB",
  270. }
  271. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
  272. baseURL, status.Worker, status.Name), status, nil)
  273. So(err, ShouldBeNil)
  274. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  275. defer resp.Body.Close()
  276. var msg map[string]string
  277. err = json.NewDecoder(resp.Body).Decode(&msg)
  278. So(err, ShouldBeNil)
  279. So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  280. })
  281. Convey("update schedule of an non-existent worker", func(ctx C) {
  282. invalidWorker := "test_worker2"
  283. sch := MirrorSchedules{
  284. []MirrorSchedule{
  285. MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)},
  286. MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)},
  287. },
  288. }
  289. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/schedules",
  290. baseURL, invalidWorker), sch, nil)
  291. So(err, ShouldBeNil)
  292. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  293. defer resp.Body.Close()
  294. var msg map[string]string
  295. err = json.NewDecoder(resp.Body).Decode(&msg)
  296. So(err, ShouldBeNil)
  297. So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  298. })
  299. Convey("handle client command", func(ctx C) {
  300. cmdChan := make(chan WorkerCmd, 1)
  301. workerServer := makeMockWorkerServer(cmdChan)
  302. workerPort := rand.Intn(10000) + 30000
  303. bindAddress := fmt.Sprintf("127.0.0.1:%d", workerPort)
  304. workerBaseURL := fmt.Sprintf("http://%s", bindAddress)
  305. w := WorkerStatus{
  306. ID: "test_worker_cmd",
  307. URL: workerBaseURL + "/cmd",
  308. }
  309. resp, err := PostJSON(baseURL+"/workers", w, nil)
  310. So(err, ShouldBeNil)
  311. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  312. go func() {
  313. // run the mock worker server
  314. workerServer.Run(bindAddress)
  315. }()
  316. time.Sleep(50 * time.Millisecond)
  317. // verify the worker mock server is running
  318. workerResp, err := http.Get(workerBaseURL + "/ping")
  319. So(err, ShouldBeNil)
  320. defer workerResp.Body.Close()
  321. So(workerResp.StatusCode, ShouldEqual, http.StatusOK)
  322. Convey("when client send wrong cmd", func(ctx C) {
  323. clientCmd := ClientCmd{
  324. Cmd: CmdStart,
  325. MirrorID: "ubuntu-sync",
  326. WorkerID: "not_exist_worker",
  327. }
  328. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  329. So(err, ShouldBeNil)
  330. defer resp.Body.Close()
  331. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  332. })
  333. Convey("when client send correct cmd", func(ctx C) {
  334. clientCmd := ClientCmd{
  335. Cmd: CmdStart,
  336. MirrorID: "ubuntu-sync",
  337. WorkerID: w.ID,
  338. }
  339. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  340. So(err, ShouldBeNil)
  341. defer resp.Body.Close()
  342. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  343. time.Sleep(50 * time.Microsecond)
  344. select {
  345. case cmd := <-cmdChan:
  346. ctx.So(cmd.Cmd, ShouldEqual, clientCmd.Cmd)
  347. ctx.So(cmd.MirrorID, ShouldEqual, clientCmd.MirrorID)
  348. default:
  349. ctx.So(0, ShouldEqual, 1)
  350. }
  351. })
  352. })
  353. })
  354. })
  355. }
  356. type mockDBAdapter struct {
  357. workerStore map[string]WorkerStatus
  358. statusStore map[string]MirrorStatus
  359. }
  360. func (b *mockDBAdapter) Init() error {
  361. return nil
  362. }
  363. func (b *mockDBAdapter) ListWorkers() ([]WorkerStatus, error) {
  364. workers := make([]WorkerStatus, len(b.workerStore))
  365. idx := 0
  366. for _, w := range b.workerStore {
  367. workers[idx] = w
  368. idx++
  369. }
  370. return workers, nil
  371. }
  372. func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
  373. w, ok := b.workerStore[workerID]
  374. if !ok {
  375. return WorkerStatus{}, fmt.Errorf("invalid workerId")
  376. }
  377. return w, nil
  378. }
  379. func (b *mockDBAdapter) DeleteWorker(workerID string) error {
  380. delete(b.workerStore, workerID)
  381. return nil
  382. }
  383. func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  384. // _, ok := b.workerStore[w.ID]
  385. // if ok {
  386. // return workerStatus{}, fmt.Errorf("duplicate worker name")
  387. // }
  388. b.workerStore[w.ID] = w
  389. return w, nil
  390. }
  391. func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
  392. id := mirrorID + "/" + workerID
  393. status, ok := b.statusStore[id]
  394. if !ok {
  395. return MirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
  396. }
  397. return status, nil
  398. }
  399. func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  400. // if _, ok := b.workerStore[workerID]; !ok {
  401. // // unregistered worker
  402. // return MirrorStatus{}, fmt.Errorf("invalid workerID %s", workerID)
  403. // }
  404. id := mirrorID + "/" + workerID
  405. b.statusStore[id] = status
  406. return status, nil
  407. }
  408. func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]MirrorStatus, error) {
  409. var mirrorStatusList []MirrorStatus
  410. // simulating a database fail
  411. if workerID == _magicBadWorkerID {
  412. return []MirrorStatus{}, fmt.Errorf("database fail")
  413. }
  414. for k, v := range b.statusStore {
  415. if wID := strings.Split(k, "/")[1]; wID == workerID {
  416. mirrorStatusList = append(mirrorStatusList, v)
  417. }
  418. }
  419. return mirrorStatusList, nil
  420. }
  421. func (b *mockDBAdapter) ListAllMirrorStatus() ([]MirrorStatus, error) {
  422. var mirrorStatusList []MirrorStatus
  423. for _, v := range b.statusStore {
  424. mirrorStatusList = append(mirrorStatusList, v)
  425. }
  426. return mirrorStatusList, nil
  427. }
  428. func (b *mockDBAdapter) Close() error {
  429. return nil
  430. }
  431. func (b *mockDBAdapter) FlushDisabledJobs() error {
  432. return nil
  433. }
  434. func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine {
  435. r := gin.Default()
  436. r.GET("/ping", func(c *gin.Context) {
  437. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  438. })
  439. r.POST("/cmd", func(c *gin.Context) {
  440. var cmd WorkerCmd
  441. c.BindJSON(&cmd)
  442. cmdChan <- cmd
  443. })
  444. return r
  445. }