server_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "math/rand"
  7. "net/http"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. . "github.com/smartystreets/goconvey/convey"
  13. . "github.com/tuna/tunasync/internal"
  14. )
  15. const (
  16. _magicBadWorkerID = "magic_bad_worker_id"
  17. )
  18. func TestHTTPServer(t *testing.T) {
  19. var listenPort = 5000
  20. Convey("HTTP server should work", t, func(ctx C) {
  21. listenPort++
  22. port := listenPort
  23. addr := "127.0.0.1"
  24. baseURL := fmt.Sprintf("http://%s:%d", addr, port)
  25. InitLogger(true, true, false)
  26. s := GetTUNASyncManager(&Config{Debug: true})
  27. s.cfg.Server.Addr = addr
  28. s.cfg.Server.Port = port
  29. So(s, ShouldNotBeNil)
  30. s.setDBAdapter(&mockDBAdapter{
  31. workerStore: map[string]WorkerStatus{
  32. _magicBadWorkerID: WorkerStatus{
  33. ID: _magicBadWorkerID,
  34. }},
  35. statusStore: make(map[string]MirrorStatus),
  36. })
  37. go s.Run()
  38. time.Sleep(50 * time.Millisecond)
  39. resp, err := http.Get(baseURL + "/ping")
  40. So(err, ShouldBeNil)
  41. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  42. So(resp.Header.Get("Content-Type"), ShouldEqual, "application/json; charset=utf-8")
  43. defer resp.Body.Close()
  44. body, err := ioutil.ReadAll(resp.Body)
  45. So(err, ShouldBeNil)
  46. var p map[string]string
  47. err = json.Unmarshal(body, &p)
  48. So(err, ShouldBeNil)
  49. So(p[_infoKey], ShouldEqual, "pong")
  50. Convey("when database fail", func(ctx C) {
  51. resp, err := http.Get(fmt.Sprintf("%s/workers/%s/jobs", baseURL, _magicBadWorkerID))
  52. So(err, ShouldBeNil)
  53. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  54. defer resp.Body.Close()
  55. var msg map[string]string
  56. err = json.NewDecoder(resp.Body).Decode(&msg)
  57. So(err, ShouldBeNil)
  58. So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
  59. })
  60. Convey("when register a worker", func(ctx C) {
  61. w := WorkerStatus{
  62. ID: "test_worker1",
  63. }
  64. resp, err := PostJSON(baseURL+"/workers", w, nil)
  65. So(err, ShouldBeNil)
  66. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  67. Convey("list all workers", func(ctx C) {
  68. resp, err := http.Get(baseURL + "/workers")
  69. So(err, ShouldBeNil)
  70. defer resp.Body.Close()
  71. var actualResponseObj []WorkerStatus
  72. err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
  73. So(err, ShouldBeNil)
  74. So(len(actualResponseObj), ShouldEqual, 2)
  75. })
  76. Convey("delete an existent worker", func(ctx C) {
  77. req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil)
  78. So(err, ShouldBeNil)
  79. clt := &http.Client{}
  80. resp, err := clt.Do(req)
  81. So(err, ShouldBeNil)
  82. defer resp.Body.Close()
  83. res := map[string]string{}
  84. err = json.NewDecoder(resp.Body).Decode(&res)
  85. So(err, ShouldBeNil)
  86. So(res[_infoKey], ShouldEqual, "deleted")
  87. })
  88. Convey("delete non-existent worker", func(ctx C) {
  89. invalidWorker := "test_worker233"
  90. req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil)
  91. So(err, ShouldBeNil)
  92. clt := &http.Client{}
  93. resp, err := clt.Do(req)
  94. So(err, ShouldBeNil)
  95. defer resp.Body.Close()
  96. res := map[string]string{}
  97. err = json.NewDecoder(resp.Body).Decode(&res)
  98. So(err, ShouldBeNil)
  99. So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  100. })
  101. Convey("flush disabled jobs", func(ctx C) {
  102. req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
  103. So(err, ShouldBeNil)
  104. clt := &http.Client{}
  105. resp, err := clt.Do(req)
  106. So(err, ShouldBeNil)
  107. defer resp.Body.Close()
  108. res := map[string]string{}
  109. err = json.NewDecoder(resp.Body).Decode(&res)
  110. So(err, ShouldBeNil)
  111. So(res[_infoKey], ShouldEqual, "flushed")
  112. })
  113. Convey("update mirror status of a existed worker", func(ctx C) {
  114. status := MirrorStatus{
  115. Name: "arch-sync1",
  116. Worker: "test_worker1",
  117. IsMaster: true,
  118. Status: Success,
  119. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  120. Size: "unknown",
  121. }
  122. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  123. defer resp.Body.Close()
  124. So(err, ShouldBeNil)
  125. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  126. Convey("list mirror status of an existed worker", func(ctx C) {
  127. var ms []MirrorStatus
  128. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  129. So(err, ShouldBeNil)
  130. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  131. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  132. m := ms[0]
  133. So(m.Name, ShouldEqual, status.Name)
  134. So(m.Worker, ShouldEqual, status.Worker)
  135. So(m.Status, ShouldEqual, status.Status)
  136. So(m.Upstream, ShouldEqual, status.Upstream)
  137. So(m.Size, ShouldEqual, status.Size)
  138. So(m.IsMaster, ShouldEqual, status.IsMaster)
  139. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
  140. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  141. })
  142. Convey("list all job status of all workers", func(ctx C) {
  143. var ms []WebMirrorStatus
  144. resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
  145. So(err, ShouldBeNil)
  146. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  147. m := ms[0]
  148. So(m.Name, ShouldEqual, status.Name)
  149. So(m.Status, ShouldEqual, status.Status)
  150. So(m.Upstream, ShouldEqual, status.Upstream)
  151. So(m.Size, ShouldEqual, status.Size)
  152. So(m.IsMaster, ShouldEqual, status.IsMaster)
  153. So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
  154. So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 1*time.Second)
  155. })
  156. Convey("Update size of a valid mirror", func(ctx C) {
  157. msg := struct {
  158. Name string `json:"name"`
  159. Size string `json:"size"`
  160. }{status.Name, "5GB"}
  161. url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
  162. resp, err := PostJSON(url, msg, nil)
  163. So(err, ShouldBeNil)
  164. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  165. Convey("Get new size of a mirror", func(ctx C) {
  166. var ms []MirrorStatus
  167. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  168. So(err, ShouldBeNil)
  169. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  170. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  171. m := ms[0]
  172. So(m.Name, ShouldEqual, status.Name)
  173. So(m.Worker, ShouldEqual, status.Worker)
  174. So(m.Status, ShouldEqual, status.Status)
  175. So(m.Upstream, ShouldEqual, status.Upstream)
  176. So(m.Size, ShouldEqual, "5GB")
  177. So(m.IsMaster, ShouldEqual, status.IsMaster)
  178. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
  179. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  180. })
  181. })
  182. Convey("Update size of an invalid mirror", func(ctx C) {
  183. msg := struct {
  184. Name string `json:"name"`
  185. Size string `json:"size"`
  186. }{"Invalid mirror", "5GB"}
  187. url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
  188. resp, err := PostJSON(url, msg, nil)
  189. So(err, ShouldBeNil)
  190. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  191. })
  192. // what if status changed to failed
  193. status.Status = Failed
  194. time.Sleep(3 * time.Second)
  195. resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  196. defer resp.Body.Close()
  197. So(err, ShouldBeNil)
  198. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  199. Convey("What if syncing job failed", func(ctx C) {
  200. var ms []MirrorStatus
  201. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  202. So(err, ShouldBeNil)
  203. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  204. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  205. m := ms[0]
  206. So(m.Name, ShouldEqual, status.Name)
  207. So(m.Worker, ShouldEqual, status.Worker)
  208. So(m.Status, ShouldEqual, status.Status)
  209. So(m.Upstream, ShouldEqual, status.Upstream)
  210. So(m.Size, ShouldEqual, status.Size)
  211. So(m.IsMaster, ShouldEqual, status.IsMaster)
  212. So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
  213. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  214. })
  215. })
  216. Convey("update mirror status of an inexisted worker", func(ctx C) {
  217. invalidWorker := "test_worker2"
  218. status := MirrorStatus{
  219. Name: "arch-sync2",
  220. Worker: invalidWorker,
  221. IsMaster: true,
  222. Status: Success,
  223. LastUpdate: time.Now(),
  224. LastEnded: time.Now(),
  225. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  226. Size: "4GB",
  227. }
  228. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
  229. baseURL, status.Worker, status.Name), status, nil)
  230. So(err, ShouldBeNil)
  231. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  232. defer resp.Body.Close()
  233. var msg map[string]string
  234. err = json.NewDecoder(resp.Body).Decode(&msg)
  235. So(err, ShouldBeNil)
  236. So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  237. })
  238. Convey("handle client command", func(ctx C) {
  239. cmdChan := make(chan WorkerCmd, 1)
  240. workerServer := makeMockWorkerServer(cmdChan)
  241. workerPort := rand.Intn(10000) + 30000
  242. bindAddress := fmt.Sprintf("127.0.0.1:%d", workerPort)
  243. workerBaseURL := fmt.Sprintf("http://%s", bindAddress)
  244. w := WorkerStatus{
  245. ID: "test_worker_cmd",
  246. URL: workerBaseURL + "/cmd",
  247. }
  248. resp, err := PostJSON(baseURL+"/workers", w, nil)
  249. So(err, ShouldBeNil)
  250. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  251. go func() {
  252. // run the mock worker server
  253. workerServer.Run(bindAddress)
  254. }()
  255. time.Sleep(50 * time.Microsecond)
  256. // verify the worker mock server is running
  257. workerResp, err := http.Get(workerBaseURL + "/ping")
  258. defer workerResp.Body.Close()
  259. So(err, ShouldBeNil)
  260. So(workerResp.StatusCode, ShouldEqual, http.StatusOK)
  261. Convey("when client send wrong cmd", func(ctx C) {
  262. clientCmd := ClientCmd{
  263. Cmd: CmdStart,
  264. MirrorID: "ubuntu-sync",
  265. WorkerID: "not_exist_worker",
  266. }
  267. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  268. defer resp.Body.Close()
  269. So(err, ShouldBeNil)
  270. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  271. })
  272. Convey("when client send correct cmd", func(ctx C) {
  273. clientCmd := ClientCmd{
  274. Cmd: CmdStart,
  275. MirrorID: "ubuntu-sync",
  276. WorkerID: w.ID,
  277. }
  278. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  279. defer resp.Body.Close()
  280. So(err, ShouldBeNil)
  281. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  282. time.Sleep(50 * time.Microsecond)
  283. select {
  284. case cmd := <-cmdChan:
  285. ctx.So(cmd.Cmd, ShouldEqual, clientCmd.Cmd)
  286. ctx.So(cmd.MirrorID, ShouldEqual, clientCmd.MirrorID)
  287. default:
  288. ctx.So(0, ShouldEqual, 1)
  289. }
  290. })
  291. })
  292. })
  293. })
  294. }
  295. type mockDBAdapter struct {
  296. workerStore map[string]WorkerStatus
  297. statusStore map[string]MirrorStatus
  298. }
  299. func (b *mockDBAdapter) Init() error {
  300. return nil
  301. }
  302. func (b *mockDBAdapter) ListWorkers() ([]WorkerStatus, error) {
  303. workers := make([]WorkerStatus, len(b.workerStore))
  304. idx := 0
  305. for _, w := range b.workerStore {
  306. workers[idx] = w
  307. idx++
  308. }
  309. return workers, nil
  310. }
  311. func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
  312. w, ok := b.workerStore[workerID]
  313. if !ok {
  314. return WorkerStatus{}, fmt.Errorf("invalid workerId")
  315. }
  316. return w, nil
  317. }
  318. func (b *mockDBAdapter) DeleteWorker(workerID string) error {
  319. delete(b.workerStore, workerID)
  320. return nil
  321. }
  322. func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  323. // _, ok := b.workerStore[w.ID]
  324. // if ok {
  325. // return workerStatus{}, fmt.Errorf("duplicate worker name")
  326. // }
  327. b.workerStore[w.ID] = w
  328. return w, nil
  329. }
  330. func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
  331. id := mirrorID + "/" + workerID
  332. status, ok := b.statusStore[id]
  333. if !ok {
  334. return MirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
  335. }
  336. return status, nil
  337. }
  338. func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  339. // if _, ok := b.workerStore[workerID]; !ok {
  340. // // unregistered worker
  341. // return MirrorStatus{}, fmt.Errorf("invalid workerID %s", workerID)
  342. // }
  343. id := mirrorID + "/" + workerID
  344. b.statusStore[id] = status
  345. return status, nil
  346. }
  347. func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]MirrorStatus, error) {
  348. var mirrorStatusList []MirrorStatus
  349. // simulating a database fail
  350. if workerID == _magicBadWorkerID {
  351. return []MirrorStatus{}, fmt.Errorf("database fail")
  352. }
  353. for k, v := range b.statusStore {
  354. if wID := strings.Split(k, "/")[1]; wID == workerID {
  355. mirrorStatusList = append(mirrorStatusList, v)
  356. }
  357. }
  358. return mirrorStatusList, nil
  359. }
  360. func (b *mockDBAdapter) ListAllMirrorStatus() ([]MirrorStatus, error) {
  361. var mirrorStatusList []MirrorStatus
  362. for _, v := range b.statusStore {
  363. mirrorStatusList = append(mirrorStatusList, v)
  364. }
  365. return mirrorStatusList, nil
  366. }
  367. func (b *mockDBAdapter) Close() error {
  368. return nil
  369. }
  370. func (b *mockDBAdapter) FlushDisabledJobs() error {
  371. return nil
  372. }
  373. func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine {
  374. r := gin.Default()
  375. r.GET("/ping", func(c *gin.Context) {
  376. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  377. })
  378. r.POST("/cmd", func(c *gin.Context) {
  379. var cmd WorkerCmd
  380. c.BindJSON(&cmd)
  381. cmdChan <- cmd
  382. })
  383. return r
  384. }