2
0

server_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "math/rand"
  7. "net/http"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. . "github.com/smartystreets/goconvey/convey"
  13. . "github.com/tuna/tunasync/internal"
  14. )
  15. const (
  16. _magicBadWorkerID = "magic_bad_worker_id"
  17. )
  18. func TestHTTPServer(t *testing.T) {
  19. Convey("HTTP server should work", t, func(ctx C) {
  20. InitLogger(true, true, false)
  21. s := GetTUNASyncManager(&Config{Debug: false})
  22. So(s, ShouldNotBeNil)
  23. s.setDBAdapter(&mockDBAdapter{
  24. workerStore: map[string]WorkerStatus{
  25. _magicBadWorkerID: WorkerStatus{
  26. ID: _magicBadWorkerID,
  27. }},
  28. statusStore: make(map[string]MirrorStatus),
  29. })
  30. port := rand.Intn(10000) + 20000
  31. baseURL := fmt.Sprintf("http://127.0.0.1:%d", port)
  32. go func() {
  33. s.engine.Run(fmt.Sprintf("127.0.0.1:%d", port))
  34. }()
  35. time.Sleep(50 * time.Microsecond)
  36. resp, err := http.Get(baseURL + "/ping")
  37. So(err, ShouldBeNil)
  38. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  39. So(resp.Header.Get("Content-Type"), ShouldEqual, "application/json; charset=utf-8")
  40. defer resp.Body.Close()
  41. body, err := ioutil.ReadAll(resp.Body)
  42. So(err, ShouldBeNil)
  43. var p map[string]string
  44. err = json.Unmarshal(body, &p)
  45. So(err, ShouldBeNil)
  46. So(p[_infoKey], ShouldEqual, "pong")
  47. Convey("when database fail", func(ctx C) {
  48. resp, err := http.Get(fmt.Sprintf("%s/workers/%s/jobs", baseURL, _magicBadWorkerID))
  49. So(err, ShouldBeNil)
  50. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  51. defer resp.Body.Close()
  52. var msg map[string]string
  53. err = json.NewDecoder(resp.Body).Decode(&msg)
  54. So(err, ShouldBeNil)
  55. So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
  56. })
  57. Convey("when register a worker", func(ctx C) {
  58. w := WorkerStatus{
  59. ID: "test_worker1",
  60. }
  61. resp, err := PostJSON(baseURL+"/workers", w, nil)
  62. So(err, ShouldBeNil)
  63. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  64. Convey("list all workers", func(ctx C) {
  65. resp, err := http.Get(baseURL + "/workers")
  66. So(err, ShouldBeNil)
  67. defer resp.Body.Close()
  68. var actualResponseObj []WorkerStatus
  69. err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
  70. So(err, ShouldBeNil)
  71. So(len(actualResponseObj), ShouldEqual, 2)
  72. })
  73. Convey("delete an existent worker", func(ctx C) {
  74. req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil)
  75. So(err, ShouldBeNil)
  76. clt := &http.Client{}
  77. resp, err := clt.Do(req)
  78. So(err, ShouldBeNil)
  79. defer resp.Body.Close()
  80. res := map[string]string{}
  81. err = json.NewDecoder(resp.Body).Decode(&res)
  82. So(err, ShouldBeNil)
  83. So(res[_infoKey], ShouldEqual, "deleted")
  84. })
  85. Convey("delete non-existent worker", func(ctx C) {
  86. invalidWorker := "test_worker233"
  87. req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil)
  88. So(err, ShouldBeNil)
  89. clt := &http.Client{}
  90. resp, err := clt.Do(req)
  91. So(err, ShouldBeNil)
  92. defer resp.Body.Close()
  93. res := map[string]string{}
  94. err = json.NewDecoder(resp.Body).Decode(&res)
  95. So(err, ShouldBeNil)
  96. So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  97. })
  98. Convey("flush disabled jobs", func(ctx C) {
  99. req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
  100. So(err, ShouldBeNil)
  101. clt := &http.Client{}
  102. resp, err := clt.Do(req)
  103. So(err, ShouldBeNil)
  104. defer resp.Body.Close()
  105. res := map[string]string{}
  106. err = json.NewDecoder(resp.Body).Decode(&res)
  107. So(err, ShouldBeNil)
  108. So(res[_infoKey], ShouldEqual, "flushed")
  109. })
  110. Convey("update mirror status of a existed worker", func(ctx C) {
  111. status := MirrorStatus{
  112. Name: "arch-sync1",
  113. Worker: "test_worker1",
  114. IsMaster: true,
  115. Status: Success,
  116. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  117. Size: "unknown",
  118. }
  119. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  120. defer resp.Body.Close()
  121. So(err, ShouldBeNil)
  122. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  123. Convey("list mirror status of an existed worker", func(ctx C) {
  124. var ms []MirrorStatus
  125. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  126. So(err, ShouldBeNil)
  127. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  128. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  129. m := ms[0]
  130. So(m.Name, ShouldEqual, status.Name)
  131. So(m.Worker, ShouldEqual, status.Worker)
  132. So(m.Status, ShouldEqual, status.Status)
  133. So(m.Upstream, ShouldEqual, status.Upstream)
  134. So(m.Size, ShouldEqual, status.Size)
  135. So(m.IsMaster, ShouldEqual, status.IsMaster)
  136. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
  137. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  138. })
  139. Convey("list all job status of all workers", func(ctx C) {
  140. var ms []WebMirrorStatus
  141. resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
  142. So(err, ShouldBeNil)
  143. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  144. m := ms[0]
  145. So(m.Name, ShouldEqual, status.Name)
  146. So(m.Status, ShouldEqual, status.Status)
  147. So(m.Upstream, ShouldEqual, status.Upstream)
  148. So(m.Size, ShouldEqual, status.Size)
  149. So(m.IsMaster, ShouldEqual, status.IsMaster)
  150. So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
  151. So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 1*time.Second)
  152. })
  153. Convey("Update size of a valid mirror", func(ctx C) {
  154. msg := struct {
  155. Name string `json:"name"`
  156. Size string `json:"size"`
  157. }{status.Name, "5GB"}
  158. url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
  159. resp, err := PostJSON(url, msg, nil)
  160. So(err, ShouldBeNil)
  161. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  162. Convey("Get new size of a mirror", func(ctx C) {
  163. var ms []MirrorStatus
  164. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  165. So(err, ShouldBeNil)
  166. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  167. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  168. m := ms[0]
  169. So(m.Name, ShouldEqual, status.Name)
  170. So(m.Worker, ShouldEqual, status.Worker)
  171. So(m.Status, ShouldEqual, status.Status)
  172. So(m.Upstream, ShouldEqual, status.Upstream)
  173. So(m.Size, ShouldEqual, "5GB")
  174. So(m.IsMaster, ShouldEqual, status.IsMaster)
  175. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
  176. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  177. })
  178. })
  179. Convey("Update size of an invalid mirror", func(ctx C) {
  180. msg := struct {
  181. Name string `json:"name"`
  182. Size string `json:"size"`
  183. }{"Invalid mirror", "5GB"}
  184. url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
  185. resp, err := PostJSON(url, msg, nil)
  186. So(err, ShouldBeNil)
  187. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  188. })
  189. // what if status changed to failed
  190. status.Status = Failed
  191. time.Sleep(3 * time.Second)
  192. resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  193. defer resp.Body.Close()
  194. So(err, ShouldBeNil)
  195. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  196. Convey("What if syncing job failed", func(ctx C) {
  197. var ms []MirrorStatus
  198. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  199. So(err, ShouldBeNil)
  200. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  201. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  202. m := ms[0]
  203. So(m.Name, ShouldEqual, status.Name)
  204. So(m.Worker, ShouldEqual, status.Worker)
  205. So(m.Status, ShouldEqual, status.Status)
  206. So(m.Upstream, ShouldEqual, status.Upstream)
  207. So(m.Size, ShouldEqual, status.Size)
  208. So(m.IsMaster, ShouldEqual, status.IsMaster)
  209. So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
  210. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  211. })
  212. })
  213. Convey("update mirror status of an inexisted worker", func(ctx C) {
  214. invalidWorker := "test_worker2"
  215. status := MirrorStatus{
  216. Name: "arch-sync2",
  217. Worker: invalidWorker,
  218. IsMaster: true,
  219. Status: Success,
  220. LastUpdate: time.Now(),
  221. LastEnded: time.Now(),
  222. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  223. Size: "4GB",
  224. }
  225. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
  226. baseURL, status.Worker, status.Name), status, nil)
  227. So(err, ShouldBeNil)
  228. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  229. defer resp.Body.Close()
  230. var msg map[string]string
  231. err = json.NewDecoder(resp.Body).Decode(&msg)
  232. So(err, ShouldBeNil)
  233. So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  234. })
  235. Convey("handle client command", func(ctx C) {
  236. cmdChan := make(chan WorkerCmd, 1)
  237. workerServer := makeMockWorkerServer(cmdChan)
  238. workerPort := rand.Intn(10000) + 30000
  239. bindAddress := fmt.Sprintf("127.0.0.1:%d", workerPort)
  240. workerBaseURL := fmt.Sprintf("http://%s", bindAddress)
  241. w := WorkerStatus{
  242. ID: "test_worker_cmd",
  243. URL: workerBaseURL + "/cmd",
  244. }
  245. resp, err := PostJSON(baseURL+"/workers", w, nil)
  246. So(err, ShouldBeNil)
  247. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  248. go func() {
  249. // run the mock worker server
  250. workerServer.Run(bindAddress)
  251. }()
  252. time.Sleep(50 * time.Microsecond)
  253. // verify the worker mock server is running
  254. workerResp, err := http.Get(workerBaseURL + "/ping")
  255. defer workerResp.Body.Close()
  256. So(err, ShouldBeNil)
  257. So(workerResp.StatusCode, ShouldEqual, http.StatusOK)
  258. Convey("when client send wrong cmd", func(ctx C) {
  259. clientCmd := ClientCmd{
  260. Cmd: CmdStart,
  261. MirrorID: "ubuntu-sync",
  262. WorkerID: "not_exist_worker",
  263. }
  264. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  265. defer resp.Body.Close()
  266. So(err, ShouldBeNil)
  267. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  268. })
  269. Convey("when client send correct cmd", func(ctx C) {
  270. clientCmd := ClientCmd{
  271. Cmd: CmdStart,
  272. MirrorID: "ubuntu-sync",
  273. WorkerID: w.ID,
  274. }
  275. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  276. defer resp.Body.Close()
  277. So(err, ShouldBeNil)
  278. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  279. time.Sleep(50 * time.Microsecond)
  280. select {
  281. case cmd := <-cmdChan:
  282. ctx.So(cmd.Cmd, ShouldEqual, clientCmd.Cmd)
  283. ctx.So(cmd.MirrorID, ShouldEqual, clientCmd.MirrorID)
  284. default:
  285. ctx.So(0, ShouldEqual, 1)
  286. }
  287. })
  288. })
  289. })
  290. })
  291. }
  292. type mockDBAdapter struct {
  293. workerStore map[string]WorkerStatus
  294. statusStore map[string]MirrorStatus
  295. }
  296. func (b *mockDBAdapter) Init() error {
  297. return nil
  298. }
  299. func (b *mockDBAdapter) ListWorkers() ([]WorkerStatus, error) {
  300. workers := make([]WorkerStatus, len(b.workerStore))
  301. idx := 0
  302. for _, w := range b.workerStore {
  303. workers[idx] = w
  304. idx++
  305. }
  306. return workers, nil
  307. }
  308. func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
  309. w, ok := b.workerStore[workerID]
  310. if !ok {
  311. return WorkerStatus{}, fmt.Errorf("invalid workerId")
  312. }
  313. return w, nil
  314. }
  315. func (b *mockDBAdapter) DeleteWorker(workerID string) error {
  316. delete(b.workerStore, workerID)
  317. return nil
  318. }
  319. func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  320. // _, ok := b.workerStore[w.ID]
  321. // if ok {
  322. // return workerStatus{}, fmt.Errorf("duplicate worker name")
  323. // }
  324. b.workerStore[w.ID] = w
  325. return w, nil
  326. }
  327. func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
  328. id := mirrorID + "/" + workerID
  329. status, ok := b.statusStore[id]
  330. if !ok {
  331. return MirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
  332. }
  333. return status, nil
  334. }
  335. func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  336. // if _, ok := b.workerStore[workerID]; !ok {
  337. // // unregistered worker
  338. // return MirrorStatus{}, fmt.Errorf("invalid workerID %s", workerID)
  339. // }
  340. id := mirrorID + "/" + workerID
  341. b.statusStore[id] = status
  342. return status, nil
  343. }
  344. func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]MirrorStatus, error) {
  345. var mirrorStatusList []MirrorStatus
  346. // simulating a database fail
  347. if workerID == _magicBadWorkerID {
  348. return []MirrorStatus{}, fmt.Errorf("database fail")
  349. }
  350. for k, v := range b.statusStore {
  351. if wID := strings.Split(k, "/")[1]; wID == workerID {
  352. mirrorStatusList = append(mirrorStatusList, v)
  353. }
  354. }
  355. return mirrorStatusList, nil
  356. }
  357. func (b *mockDBAdapter) ListAllMirrorStatus() ([]MirrorStatus, error) {
  358. var mirrorStatusList []MirrorStatus
  359. for _, v := range b.statusStore {
  360. mirrorStatusList = append(mirrorStatusList, v)
  361. }
  362. return mirrorStatusList, nil
  363. }
  364. func (b *mockDBAdapter) Close() error {
  365. return nil
  366. }
  367. func (b *mockDBAdapter) FlushDisabledJobs() error {
  368. return nil
  369. }
  370. func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine {
  371. r := gin.Default()
  372. r.GET("/ping", func(c *gin.Context) {
  373. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  374. })
  375. r.POST("/cmd", func(c *gin.Context) {
  376. var cmd WorkerCmd
  377. c.BindJSON(&cmd)
  378. cmdChan <- cmd
  379. })
  380. return r
  381. }