server_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "math/rand"
  7. "net/http"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. . "github.com/smartystreets/goconvey/convey"
  13. . "github.com/tuna/tunasync/internal"
  14. )
  15. const (
  16. _magicBadWorkerID = "magic_bad_worker_id"
  17. )
  18. func TestHTTPServer(t *testing.T) {
  19. Convey("HTTP server should work", t, func(ctx C) {
  20. InitLogger(true, true, false)
  21. s := GetTUNASyncManager(&Config{Debug: false})
  22. So(s, ShouldNotBeNil)
  23. s.setDBAdapter(&mockDBAdapter{
  24. workerStore: map[string]WorkerStatus{
  25. _magicBadWorkerID: WorkerStatus{
  26. ID: _magicBadWorkerID,
  27. }},
  28. statusStore: make(map[string]MirrorStatus),
  29. })
  30. port := rand.Intn(10000) + 20000
  31. baseURL := fmt.Sprintf("http://127.0.0.1:%d", port)
  32. go func() {
  33. s.engine.Run(fmt.Sprintf("127.0.0.1:%d", port))
  34. }()
  35. time.Sleep(50 * time.Microsecond)
  36. resp, err := http.Get(baseURL + "/ping")
  37. So(err, ShouldBeNil)
  38. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  39. So(resp.Header.Get("Content-Type"), ShouldEqual, "application/json; charset=utf-8")
  40. defer resp.Body.Close()
  41. body, err := ioutil.ReadAll(resp.Body)
  42. So(err, ShouldBeNil)
  43. var p map[string]string
  44. err = json.Unmarshal(body, &p)
  45. So(err, ShouldBeNil)
  46. So(p[_infoKey], ShouldEqual, "pong")
  47. Convey("when database fail", func(ctx C) {
  48. resp, err := http.Get(fmt.Sprintf("%s/workers/%s/jobs", baseURL, _magicBadWorkerID))
  49. So(err, ShouldBeNil)
  50. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  51. defer resp.Body.Close()
  52. var msg map[string]string
  53. err = json.NewDecoder(resp.Body).Decode(&msg)
  54. So(err, ShouldBeNil)
  55. So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
  56. })
  57. Convey("when register a worker", func(ctx C) {
  58. w := WorkerStatus{
  59. ID: "test_worker1",
  60. }
  61. resp, err := PostJSON(baseURL+"/workers", w, nil)
  62. So(err, ShouldBeNil)
  63. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  64. Convey("list all workers", func(ctx C) {
  65. resp, err := http.Get(baseURL + "/workers")
  66. So(err, ShouldBeNil)
  67. defer resp.Body.Close()
  68. var actualResponseObj []WorkerStatus
  69. err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
  70. So(err, ShouldBeNil)
  71. So(len(actualResponseObj), ShouldEqual, 2)
  72. })
  73. Convey("flush disabled jobs", func(ctx C) {
  74. req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
  75. So(err, ShouldBeNil)
  76. clt := &http.Client{}
  77. resp, err := clt.Do(req)
  78. So(err, ShouldBeNil)
  79. defer resp.Body.Close()
  80. res := map[string]string{}
  81. err = json.NewDecoder(resp.Body).Decode(&res)
  82. So(err, ShouldBeNil)
  83. So(res[_infoKey], ShouldEqual, "flushed")
  84. })
  85. Convey("update mirror status of a existed worker", func(ctx C) {
  86. status := MirrorStatus{
  87. Name: "arch-sync1",
  88. Worker: "test_worker1",
  89. IsMaster: true,
  90. Status: Success,
  91. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  92. Size: "unknown",
  93. }
  94. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  95. defer resp.Body.Close()
  96. So(err, ShouldBeNil)
  97. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  98. Convey("list mirror status of an existed worker", func(ctx C) {
  99. var ms []MirrorStatus
  100. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  101. So(err, ShouldBeNil)
  102. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  103. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  104. m := ms[0]
  105. So(m.Name, ShouldEqual, status.Name)
  106. So(m.Worker, ShouldEqual, status.Worker)
  107. So(m.Status, ShouldEqual, status.Status)
  108. So(m.Upstream, ShouldEqual, status.Upstream)
  109. So(m.Size, ShouldEqual, status.Size)
  110. So(m.IsMaster, ShouldEqual, status.IsMaster)
  111. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
  112. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  113. })
  114. Convey("list all job status of all workers", func(ctx C) {
  115. var ms []WebMirrorStatus
  116. resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
  117. So(err, ShouldBeNil)
  118. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  119. m := ms[0]
  120. So(m.Name, ShouldEqual, status.Name)
  121. So(m.Status, ShouldEqual, status.Status)
  122. So(m.Upstream, ShouldEqual, status.Upstream)
  123. So(m.Size, ShouldEqual, status.Size)
  124. So(m.IsMaster, ShouldEqual, status.IsMaster)
  125. So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
  126. So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 1*time.Second)
  127. })
  128. Convey("Update size of a valid mirror", func(ctx C) {
  129. msg := struct {
  130. Name string `json:"name"`
  131. Size string `json:"size"`
  132. }{status.Name, "5GB"}
  133. url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
  134. resp, err := PostJSON(url, msg, nil)
  135. So(err, ShouldBeNil)
  136. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  137. Convey("Get new size of a mirror", func(ctx C) {
  138. var ms []MirrorStatus
  139. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  140. So(err, ShouldBeNil)
  141. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  142. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  143. m := ms[0]
  144. So(m.Name, ShouldEqual, status.Name)
  145. So(m.Worker, ShouldEqual, status.Worker)
  146. So(m.Status, ShouldEqual, status.Status)
  147. So(m.Upstream, ShouldEqual, status.Upstream)
  148. So(m.Size, ShouldEqual, "5GB")
  149. So(m.IsMaster, ShouldEqual, status.IsMaster)
  150. So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
  151. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  152. })
  153. })
  154. Convey("Update size of an invalid mirror", func(ctx C) {
  155. msg := struct {
  156. Name string `json:"name"`
  157. Size string `json:"size"`
  158. }{"Invalid mirror", "5GB"}
  159. url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
  160. resp, err := PostJSON(url, msg, nil)
  161. So(err, ShouldBeNil)
  162. So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
  163. })
  164. // what if status changed to failed
  165. status.Status = Failed
  166. time.Sleep(3 * time.Second)
  167. resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
  168. defer resp.Body.Close()
  169. So(err, ShouldBeNil)
  170. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  171. Convey("What if syncing job failed", func(ctx C) {
  172. var ms []MirrorStatus
  173. resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
  174. So(err, ShouldBeNil)
  175. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  176. // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
  177. m := ms[0]
  178. So(m.Name, ShouldEqual, status.Name)
  179. So(m.Worker, ShouldEqual, status.Worker)
  180. So(m.Status, ShouldEqual, status.Status)
  181. So(m.Upstream, ShouldEqual, status.Upstream)
  182. So(m.Size, ShouldEqual, status.Size)
  183. So(m.IsMaster, ShouldEqual, status.IsMaster)
  184. So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
  185. So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
  186. })
  187. })
  188. Convey("update mirror status of an inexisted worker", func(ctx C) {
  189. invalidWorker := "test_worker2"
  190. status := MirrorStatus{
  191. Name: "arch-sync2",
  192. Worker: invalidWorker,
  193. IsMaster: true,
  194. Status: Success,
  195. LastUpdate: time.Now(),
  196. LastEnded: time.Now(),
  197. Upstream: "mirrors.tuna.tsinghua.edu.cn",
  198. Size: "4GB",
  199. }
  200. resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
  201. baseURL, status.Worker, status.Name), status, nil)
  202. So(err, ShouldBeNil)
  203. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  204. defer resp.Body.Close()
  205. var msg map[string]string
  206. err = json.NewDecoder(resp.Body).Decode(&msg)
  207. So(err, ShouldBeNil)
  208. So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
  209. })
  210. Convey("handle client command", func(ctx C) {
  211. cmdChan := make(chan WorkerCmd, 1)
  212. workerServer := makeMockWorkerServer(cmdChan)
  213. workerPort := rand.Intn(10000) + 30000
  214. bindAddress := fmt.Sprintf("127.0.0.1:%d", workerPort)
  215. workerBaseURL := fmt.Sprintf("http://%s", bindAddress)
  216. w := WorkerStatus{
  217. ID: "test_worker_cmd",
  218. URL: workerBaseURL + "/cmd",
  219. }
  220. resp, err := PostJSON(baseURL+"/workers", w, nil)
  221. So(err, ShouldBeNil)
  222. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  223. go func() {
  224. // run the mock worker server
  225. workerServer.Run(bindAddress)
  226. }()
  227. time.Sleep(50 * time.Microsecond)
  228. // verify the worker mock server is running
  229. workerResp, err := http.Get(workerBaseURL + "/ping")
  230. defer workerResp.Body.Close()
  231. So(err, ShouldBeNil)
  232. So(workerResp.StatusCode, ShouldEqual, http.StatusOK)
  233. Convey("when client send wrong cmd", func(ctx C) {
  234. clientCmd := ClientCmd{
  235. Cmd: CmdStart,
  236. MirrorID: "ubuntu-sync",
  237. WorkerID: "not_exist_worker",
  238. }
  239. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  240. defer resp.Body.Close()
  241. So(err, ShouldBeNil)
  242. So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
  243. })
  244. Convey("when client send correct cmd", func(ctx C) {
  245. clientCmd := ClientCmd{
  246. Cmd: CmdStart,
  247. MirrorID: "ubuntu-sync",
  248. WorkerID: w.ID,
  249. }
  250. resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
  251. defer resp.Body.Close()
  252. So(err, ShouldBeNil)
  253. So(resp.StatusCode, ShouldEqual, http.StatusOK)
  254. time.Sleep(50 * time.Microsecond)
  255. select {
  256. case cmd := <-cmdChan:
  257. ctx.So(cmd.Cmd, ShouldEqual, clientCmd.Cmd)
  258. ctx.So(cmd.MirrorID, ShouldEqual, clientCmd.MirrorID)
  259. default:
  260. ctx.So(0, ShouldEqual, 1)
  261. }
  262. })
  263. })
  264. })
  265. })
  266. }
  267. type mockDBAdapter struct {
  268. workerStore map[string]WorkerStatus
  269. statusStore map[string]MirrorStatus
  270. }
  271. func (b *mockDBAdapter) Init() error {
  272. return nil
  273. }
  274. func (b *mockDBAdapter) ListWorkers() ([]WorkerStatus, error) {
  275. workers := make([]WorkerStatus, len(b.workerStore))
  276. idx := 0
  277. for _, w := range b.workerStore {
  278. workers[idx] = w
  279. idx++
  280. }
  281. return workers, nil
  282. }
  283. func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
  284. w, ok := b.workerStore[workerID]
  285. if !ok {
  286. return WorkerStatus{}, fmt.Errorf("invalid workerId")
  287. }
  288. return w, nil
  289. }
  290. func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  291. // _, ok := b.workerStore[w.ID]
  292. // if ok {
  293. // return workerStatus{}, fmt.Errorf("duplicate worker name")
  294. // }
  295. b.workerStore[w.ID] = w
  296. return w, nil
  297. }
  298. func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
  299. id := mirrorID + "/" + workerID
  300. status, ok := b.statusStore[id]
  301. if !ok {
  302. return MirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
  303. }
  304. return status, nil
  305. }
  306. func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  307. // if _, ok := b.workerStore[workerID]; !ok {
  308. // // unregistered worker
  309. // return MirrorStatus{}, fmt.Errorf("invalid workerID %s", workerID)
  310. // }
  311. id := mirrorID + "/" + workerID
  312. b.statusStore[id] = status
  313. return status, nil
  314. }
  315. func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]MirrorStatus, error) {
  316. var mirrorStatusList []MirrorStatus
  317. // simulating a database fail
  318. if workerID == _magicBadWorkerID {
  319. return []MirrorStatus{}, fmt.Errorf("database fail")
  320. }
  321. for k, v := range b.statusStore {
  322. if wID := strings.Split(k, "/")[1]; wID == workerID {
  323. mirrorStatusList = append(mirrorStatusList, v)
  324. }
  325. }
  326. return mirrorStatusList, nil
  327. }
  328. func (b *mockDBAdapter) ListAllMirrorStatus() ([]MirrorStatus, error) {
  329. var mirrorStatusList []MirrorStatus
  330. for _, v := range b.statusStore {
  331. mirrorStatusList = append(mirrorStatusList, v)
  332. }
  333. return mirrorStatusList, nil
  334. }
  335. func (b *mockDBAdapter) Close() error {
  336. return nil
  337. }
  338. func (b *mockDBAdapter) FlushDisabledJobs() error {
  339. return nil
  340. }
  341. func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine {
  342. r := gin.Default()
  343. r.GET("/ping", func(c *gin.Context) {
  344. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  345. })
  346. r.POST("/cmd", func(c *gin.Context) {
  347. var cmd WorkerCmd
  348. c.BindJSON(&cmd)
  349. cmdChan <- cmd
  350. })
  351. return r
  352. }