server_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "math/rand"
  7. "net/http"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. . "github.com/smartystreets/goconvey/convey"
  13. . "github.com/tuna/tunasync/internal"
  14. )
  15. const (
  16. _magicBadWorkerID = "magic_bad_worker_id"
  17. )
  18. func TestHTTPServer(t *testing.T) {
  19. var listenPort = 5000
  20. Convey("HTTP server should work", t, func(ctx C) {
  21. listenPort++
  22. port := listenPort
  23. addr := "127.0.0.1"
  24. baseURL := fmt.Sprintf("http://%s:%d", addr, port)
  25. InitLogger(true, true, false)
  26. s := GetTUNASyncManager(&Config{Debug: true})
  27. s.cfg.Server.Addr = addr
  28. s.cfg.Server.Port = port
  29. So(s, ShouldNotBeNil)
  30. s.setDBAdapter(&mockDBAdapter{
  31. workerStore: map[string]WorkerStatus{
  32. _magicBadWorkerID: WorkerStatus{
  33. ID: _magicBadWorkerID,
  34. }},
  35. statusStore: make(map[string]MirrorStatus),
  36. })
  37. go s.Run()
  38. time.Sleep(50 * time.Millisecond)
  39. resp, err := http.Get(baseURL + "/ping")
  40. So(err, ShouldBeNil)
  41. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  42. So(resp.Header.Get("Content-Type"), ShouldEqual, "application/json; charset=utf-8")
  43. defer resp.Body.Close()
  44. body, err := ioutil.ReadAll(resp.Body)
  45. So(err, ShouldBeNil)
  46. var p map[string]string
  47. err = json.Unmarshal(body, &p)
  48. So(err, ShouldBeNil)
  49. So(p[_infoKey], ShouldEqual, "pong")
  50. Convey("when database fail", func(ctx C) {
  51. resp, err := http.Get(fmt.Sprintf("%s/workers/%s/jobs", baseURL, _magicBadWorkerID))
  52. So(err, ShouldBeNil)
  53. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  54. defer resp.Body.Close()
  55. var msg map[string]string
  56. err = json.NewDecoder(resp.Body).Decode(&msg)
  57. So(err, ShouldBeNil)
  58. So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
  59. })
  60. Convey("when register a worker", func(ctx C) {
  61. w := WorkerStatus{
  62. ID: "test_worker1",
  63. }
  64. resp, err := PostJSON(baseURL+"/workers", w, nil)
  65. So(err, ShouldBeNil)
  66. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  67. Convey("list all workers", func(ctx C) {
  68. resp, err := http.Get(baseURL + "/workers")
  69. So(err, ShouldBeNil)
  70. defer resp.Body.Close()
  71. var actualResponseObj []WorkerStatus
  72. err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
  73. So(err, ShouldBeNil)
  74. So(len(actualResponseObj), ShouldEqual, 2)
  75. })
  76. Convey("delete an existent worker", func(ctx C) {
  77. req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil)
  78. So(err, ShouldBeNil)
  79. clt := &http.Client{}
  80. resp, err := clt.Do(req)
  81. So(err, ShouldBeNil)
  82. defer resp.Body.Close()
  83. res := map[string]string{}
  84. err = json.NewDecoder(resp.Body).Decode(&res)
  85. So(err, ShouldBeNil)
  86. So(res[_infoKey], ShouldEqual, "deleted")
  87. })
  88. Convey("delete non-existent worker", func(ctx C) {
  89. invalidWorker := "test_worker233"
  90. req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil)
  91. So(err, ShouldBeNil)
  92. clt := &http.Client{}
  93. resp, err := clt.Do(req)
  94. So(err, ShouldBeNil)
  95. defer resp.Body.Close()
  96. res := map[string]string{}
  97. err = json.NewDecoder(resp.Body).Decode(&res)
  98. So(err, ShouldBeNil)
  99. So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  100. })
  101. Convey("flush disabled jobs", func(ctx C) {
  102. req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
  103. So(err, ShouldBeNil)
  104. clt := &http.Client{}
  105. resp, err := clt.Do(req)
  106. So(err, ShouldBeNil)
  107. defer resp.Body.Close()
  108. res := map[string]string{}
  109. err = json.NewDecoder(resp.Body).Decode(&res)
  110. So(err, ShouldBeNil)
  111. So(res[_infoKey], ShouldEqual, "flushed")
  112. })
  113. Convey("update mirror status of a existed worker", func(ctx C) {
  114. status := MirrorStatus{
  115. Name: "arch-sync1",
  116. Worker: "test_worker1",
  117. IsMaster: true,
  118. Status: Success,
  119. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  120. Size: "unknown",
  121. }
  122. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  123. So(err, ShouldBeNil)
  124. defer resp.Body.Close()
  125. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  126. Convey("list mirror status of an existed worker", func(ctx C) {
  127. var ms []MirrorStatus
  128. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  129. So(err, ShouldBeNil)
  130. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  131. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  132. m := ms[0]
  133. So(m.Name, ShouldEqual, status.Name)
  134. So(m.Worker, ShouldEqual, status.Worker)
  135. So(m.Status, ShouldEqual, status.Status)
  136. So(m.Upstream, ShouldEqual, status.Upstream)
  137. So(m.Size, ShouldEqual, status.Size)
  138. So(m.IsMaster, ShouldEqual, status.IsMaster)
  139. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
  140. So(time.Now().Sub(m.LastStarted), ShouldBeLessThan, 2*time.Minute)
  141. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  142. })
  143. Convey("list all job status of all workers", func(ctx C) {
  144. var ms []WebMirrorStatus
  145. resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
  146. So(err, ShouldBeNil)
  147. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  148. m := ms[0]
  149. So(m.Name, ShouldEqual, status.Name)
  150. So(m.Status, ShouldEqual, status.Status)
  151. So(m.Upstream, ShouldEqual, status.Upstream)
  152. So(m.Size, ShouldEqual, status.Size)
  153. So(m.IsMaster, ShouldEqual, status.IsMaster)
  154. So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
  155. So(time.Now().Sub(m.LastStarted.Time), ShouldBeLessThan, 2*time.Minute)
  156. So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 1*time.Second)
  157. })
  158. Convey("Update size of a valid mirror", func(ctx C) {
  159. msg := struct {
  160. Name string `json:"name"`
  161. Size string `json:"size"`
  162. }{status.Name, "5GB"}
  163. url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
  164. resp, err := PostJSON(url, msg, nil)
  165. So(err, ShouldBeNil)
  166. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  167. Convey("Get new size of a mirror", func(ctx C) {
  168. var ms []MirrorStatus
  169. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  170. So(err, ShouldBeNil)
  171. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  172. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  173. m := ms[0]
  174. So(m.Name, ShouldEqual, status.Name)
  175. So(m.Worker, ShouldEqual, status.Worker)
  176. So(m.Status, ShouldEqual, status.Status)
  177. So(m.Upstream, ShouldEqual, status.Upstream)
  178. So(m.Size, ShouldEqual, "5GB")
  179. So(m.IsMaster, ShouldEqual, status.IsMaster)
  180. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
  181. So(time.Now().Sub(m.LastStarted), ShouldBeLessThan, 2*time.Minute)
  182. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  183. })
  184. })
  185. Convey("Update schedule of valid mirrors", func(ctx C) {
  186. msg := MirrorSchedules{
  187. []MirrorSchedule{
  188. MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)},
  189. MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)},
  190. },
  191. }
  192. url := fmt.Sprintf("%s/workers/%s/schedules", baseURL, status.Worker)
  193. resp, err := PostJSON(url, msg, nil)
  194. So(err, ShouldBeNil)
  195. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  196. })
  197. Convey("Update size of an invalid mirror", func(ctx C) {
  198. msg := struct {
  199. Name string `json:"name"`
  200. Size string `json:"size"`
  201. }{"Invalid mirror", "5GB"}
  202. url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
  203. resp, err := PostJSON(url, msg, nil)
  204. So(err, ShouldBeNil)
  205. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  206. })
  207. // what if status changed to failed
  208. status.Status = Failed
  209. time.Sleep(3 * time.Second)
  210. resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  211. So(err, ShouldBeNil)
  212. defer resp.Body.Close()
  213. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  214. Convey("What if syncing job failed", func(ctx C) {
  215. var ms []MirrorStatus
  216. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  217. So(err, ShouldBeNil)
  218. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  219. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  220. m := ms[0]
  221. So(m.Name, ShouldEqual, status.Name)
  222. So(m.Worker, ShouldEqual, status.Worker)
  223. So(m.Status, ShouldEqual, status.Status)
  224. So(m.Upstream, ShouldEqual, status.Upstream)
  225. So(m.Size, ShouldEqual, status.Size)
  226. So(m.IsMaster, ShouldEqual, status.IsMaster)
  227. So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
  228. So(time.Now().Sub(m.LastStarted), ShouldBeGreaterThan, 1*time.Minute)
  229. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  230. })
  231. })
  232. Convey("update mirror status of an inexisted worker", func(ctx C) {
  233. invalidWorker := "test_worker2"
  234. status := MirrorStatus{
  235. Name: "arch-sync2",
  236. Worker: invalidWorker,
  237. IsMaster: true,
  238. Status: Success,
  239. LastUpdate: time.Now(),
  240. LastStarted: time.Now(),
  241. LastEnded: time.Now(),
  242. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  243. Size: "4GB",
  244. }
  245. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
  246. baseURL, status.Worker, status.Name), status, nil)
  247. So(err, ShouldBeNil)
  248. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  249. defer resp.Body.Close()
  250. var msg map[string]string
  251. err = json.NewDecoder(resp.Body).Decode(&msg)
  252. So(err, ShouldBeNil)
  253. So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  254. })
  255. Convey("update schedule of an non-existent worker", func(ctx C) {
  256. invalidWorker := "test_worker2"
  257. sch := MirrorSchedules{
  258. []MirrorSchedule{
  259. MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)},
  260. MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)},
  261. },
  262. }
  263. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/schedules",
  264. baseURL, invalidWorker), sch, nil)
  265. So(err, ShouldBeNil)
  266. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  267. defer resp.Body.Close()
  268. var msg map[string]string
  269. err = json.NewDecoder(resp.Body).Decode(&msg)
  270. So(err, ShouldBeNil)
  271. So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  272. })
  273. Convey("handle client command", func(ctx C) {
  274. cmdChan := make(chan WorkerCmd, 1)
  275. workerServer := makeMockWorkerServer(cmdChan)
  276. workerPort := rand.Intn(10000) + 30000
  277. bindAddress := fmt.Sprintf("127.0.0.1:%d", workerPort)
  278. workerBaseURL := fmt.Sprintf("http://%s", bindAddress)
  279. w := WorkerStatus{
  280. ID: "test_worker_cmd",
  281. URL: workerBaseURL + "/cmd",
  282. }
  283. resp, err := PostJSON(baseURL+"/workers", w, nil)
  284. So(err, ShouldBeNil)
  285. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  286. go func() {
  287. // run the mock worker server
  288. workerServer.Run(bindAddress)
  289. }()
  290. time.Sleep(50 * time.Millisecond)
  291. // verify the worker mock server is running
  292. workerResp, err := http.Get(workerBaseURL + "/ping")
  293. So(err, ShouldBeNil)
  294. defer workerResp.Body.Close()
  295. So(workerResp.StatusCode, ShouldEqual, http.StatusOK)
  296. Convey("when client send wrong cmd", func(ctx C) {
  297. clientCmd := ClientCmd{
  298. Cmd: CmdStart,
  299. MirrorID: "ubuntu-sync",
  300. WorkerID: "not_exist_worker",
  301. }
  302. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  303. So(err, ShouldBeNil)
  304. defer resp.Body.Close()
  305. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  306. })
  307. Convey("when client send correct cmd", func(ctx C) {
  308. clientCmd := ClientCmd{
  309. Cmd: CmdStart,
  310. MirrorID: "ubuntu-sync",
  311. WorkerID: w.ID,
  312. }
  313. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  314. So(err, ShouldBeNil)
  315. defer resp.Body.Close()
  316. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  317. time.Sleep(50 * time.Microsecond)
  318. select {
  319. case cmd := <-cmdChan:
  320. ctx.So(cmd.Cmd, ShouldEqual, clientCmd.Cmd)
  321. ctx.So(cmd.MirrorID, ShouldEqual, clientCmd.MirrorID)
  322. default:
  323. ctx.So(0, ShouldEqual, 1)
  324. }
  325. })
  326. })
  327. })
  328. })
  329. }
  330. type mockDBAdapter struct {
  331. workerStore map[string]WorkerStatus
  332. statusStore map[string]MirrorStatus
  333. }
  334. func (b *mockDBAdapter) Init() error {
  335. return nil
  336. }
  337. func (b *mockDBAdapter) ListWorkers() ([]WorkerStatus, error) {
  338. workers := make([]WorkerStatus, len(b.workerStore))
  339. idx := 0
  340. for _, w := range b.workerStore {
  341. workers[idx] = w
  342. idx++
  343. }
  344. return workers, nil
  345. }
  346. func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
  347. w, ok := b.workerStore[workerID]
  348. if !ok {
  349. return WorkerStatus{}, fmt.Errorf("invalid workerId")
  350. }
  351. return w, nil
  352. }
  353. func (b *mockDBAdapter) DeleteWorker(workerID string) error {
  354. delete(b.workerStore, workerID)
  355. return nil
  356. }
  357. func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  358. // _, ok := b.workerStore[w.ID]
  359. // if ok {
  360. // return workerStatus{}, fmt.Errorf("duplicate worker name")
  361. // }
  362. b.workerStore[w.ID] = w
  363. return w, nil
  364. }
  365. func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
  366. id := mirrorID + "/" + workerID
  367. status, ok := b.statusStore[id]
  368. if !ok {
  369. return MirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
  370. }
  371. return status, nil
  372. }
  373. func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  374. // if _, ok := b.workerStore[workerID]; !ok {
  375. // // unregistered worker
  376. // return MirrorStatus{}, fmt.Errorf("invalid workerID %s", workerID)
  377. // }
  378. id := mirrorID + "/" + workerID
  379. b.statusStore[id] = status
  380. return status, nil
  381. }
  382. func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]MirrorStatus, error) {
  383. var mirrorStatusList []MirrorStatus
  384. // simulating a database fail
  385. if workerID == _magicBadWorkerID {
  386. return []MirrorStatus{}, fmt.Errorf("database fail")
  387. }
  388. for k, v := range b.statusStore {
  389. if wID := strings.Split(k, "/")[1]; wID == workerID {
  390. mirrorStatusList = append(mirrorStatusList, v)
  391. }
  392. }
  393. return mirrorStatusList, nil
  394. }
  395. func (b *mockDBAdapter) ListAllMirrorStatus() ([]MirrorStatus, error) {
  396. var mirrorStatusList []MirrorStatus
  397. for _, v := range b.statusStore {
  398. mirrorStatusList = append(mirrorStatusList, v)
  399. }
  400. return mirrorStatusList, nil
  401. }
  402. func (b *mockDBAdapter) Close() error {
  403. return nil
  404. }
  405. func (b *mockDBAdapter) FlushDisabledJobs() error {
  406. return nil
  407. }
  408. func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine {
  409. r := gin.Default()
  410. r.GET("/ping", func(c *gin.Context) {
  411. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  412. })
  413. r.POST("/cmd", func(c *gin.Context) {
  414. var cmd WorkerCmd
  415. c.BindJSON(&cmd)
  416. cmdChan <- cmd
  417. })
  418. return r
  419. }