123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- package manager
- import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "math/rand"
- "net/http"
- "strings"
- "testing"
- "time"
- "github.com/gin-gonic/gin"
- . "github.com/smartystreets/goconvey/convey"
- . "github.com/tuna/tunasync/internal"
- )
- const (
- _magicBadWorkerID = "magic_bad_worker_id"
- )
- func TestHTTPServer(t *testing.T) {
- Convey("HTTP server should work", t, func(ctx C) {
- InitLogger(true, true, false)
- s := GetTUNASyncManager(&Config{Debug: false})
- So(s, ShouldNotBeNil)
- s.setDBAdapter(&mockDBAdapter{
- workerStore: map[string]WorkerStatus{
- _magicBadWorkerID: WorkerStatus{
- ID: _magicBadWorkerID,
- }},
- statusStore: make(map[string]MirrorStatus),
- })
- port := rand.Intn(10000) + 20000
- baseURL := fmt.Sprintf("http://127.0.0.1:%d", port)
- go func() {
- s.engine.Run(fmt.Sprintf("127.0.0.1:%d", port))
- }()
- time.Sleep(50 * time.Microsecond)
- resp, err := http.Get(baseURL + "/ping")
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- So(resp.Header.Get("Content-Type"), ShouldEqual, "application/json; charset=utf-8")
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- So(err, ShouldBeNil)
- var p map[string]string
- err = json.Unmarshal(body, &p)
- So(err, ShouldBeNil)
- So(p[_infoKey], ShouldEqual, "pong")
- Convey("when database fail", func(ctx C) {
- resp, err := http.Get(fmt.Sprintf("%s/workers/%s/jobs", baseURL, _magicBadWorkerID))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
- defer resp.Body.Close()
- var msg map[string]string
- err = json.NewDecoder(resp.Body).Decode(&msg)
- So(err, ShouldBeNil)
- So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
- })
- Convey("when register a worker", func(ctx C) {
- w := WorkerStatus{
- ID: "test_worker1",
- }
- resp, err := PostJSON(baseURL+"/workers", w, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- Convey("list all workers", func(ctx C) {
- So(err, ShouldBeNil)
- resp, err := http.Get(baseURL + "/workers")
- So(err, ShouldBeNil)
- defer resp.Body.Close()
- var actualResponseObj []WorkerStatus
- err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
- So(err, ShouldBeNil)
- So(len(actualResponseObj), ShouldEqual, 2)
- })
- Convey("update mirror status of a existed worker", func(ctx C) {
- status := MirrorStatus{
- Name: "arch-sync1",
- Worker: "test_worker1",
- IsMaster: true,
- Status: Success,
- Upstream: "mirrors.tuna.tsinghua.edu.cn",
- Size: "3GB",
- }
- resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
- defer resp.Body.Close()
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- Convey("list mirror status of an existed worker", func(ctx C) {
- var ms []MirrorStatus
- resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
- m := ms[0]
- So(m.Name, ShouldEqual, status.Name)
- So(m.Worker, ShouldEqual, status.Worker)
- So(m.Status, ShouldEqual, status.Status)
- So(m.Upstream, ShouldEqual, status.Upstream)
- So(m.Size, ShouldEqual, status.Size)
- So(m.IsMaster, ShouldEqual, status.IsMaster)
- So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
- })
- Convey("list all job status of all workers", func(ctx C) {
- var ms []webMirrorStatus
- resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- m := ms[0]
- So(m.Name, ShouldEqual, status.Name)
- So(m.Status, ShouldEqual, status.Status)
- So(m.Upstream, ShouldEqual, status.Upstream)
- So(m.Size, ShouldEqual, status.Size)
- So(m.IsMaster, ShouldEqual, status.IsMaster)
- So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
- })
- })
- Convey("update mirror status of an inexisted worker", func(ctx C) {
- invalidWorker := "test_worker2"
- status := MirrorStatus{
- Name: "arch-sync2",
- Worker: invalidWorker,
- IsMaster: true,
- Status: Success,
- LastUpdate: time.Now(),
- Upstream: "mirrors.tuna.tsinghua.edu.cn",
- Size: "4GB",
- }
- resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
- baseURL, status.Worker, status.Name), status, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
- defer resp.Body.Close()
- var msg map[string]string
- err = json.NewDecoder(resp.Body).Decode(&msg)
- So(err, ShouldBeNil)
- So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
- })
- Convey("handle client command", func(ctx C) {
- cmdChan := make(chan WorkerCmd, 1)
- workerServer := makeMockWorkerServer(cmdChan)
- workerPort := rand.Intn(10000) + 30000
- bindAddress := fmt.Sprintf("127.0.0.1:%d", workerPort)
- workerBaseURL := fmt.Sprintf("http://%s", bindAddress)
- w := WorkerStatus{
- ID: "test_worker_cmd",
- URL: workerBaseURL + "/cmd",
- }
- resp, err := PostJSON(baseURL+"/workers", w, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- go func() {
- // run the mock worker server
- workerServer.Run(bindAddress)
- }()
- time.Sleep(50 * time.Microsecond)
- // verify the worker mock server is running
- workerResp, err := http.Get(workerBaseURL + "/ping")
- defer workerResp.Body.Close()
- So(err, ShouldBeNil)
- So(workerResp.StatusCode, ShouldEqual, http.StatusOK)
- Convey("when client send wrong cmd", func(ctx C) {
- clientCmd := ClientCmd{
- Cmd: CmdStart,
- MirrorID: "ubuntu-sync",
- WorkerID: "not_exist_worker",
- }
- resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
- defer resp.Body.Close()
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
- })
- Convey("when client send correct cmd", func(ctx C) {
- clientCmd := ClientCmd{
- Cmd: CmdStart,
- MirrorID: "ubuntu-sync",
- WorkerID: w.ID,
- }
- resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
- defer resp.Body.Close()
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- time.Sleep(50 * time.Microsecond)
- select {
- case cmd := <-cmdChan:
- ctx.So(cmd.Cmd, ShouldEqual, clientCmd.Cmd)
- ctx.So(cmd.MirrorID, ShouldEqual, clientCmd.MirrorID)
- default:
- ctx.So(0, ShouldEqual, 1)
- }
- })
- })
- })
- })
- }
- type mockDBAdapter struct {
- workerStore map[string]WorkerStatus
- statusStore map[string]MirrorStatus
- }
- func (b *mockDBAdapter) Init() error {
- return nil
- }
- func (b *mockDBAdapter) ListWorkers() ([]WorkerStatus, error) {
- workers := make([]WorkerStatus, len(b.workerStore))
- idx := 0
- for _, w := range b.workerStore {
- workers[idx] = w
- idx++
- }
- return workers, nil
- }
- func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
- w, ok := b.workerStore[workerID]
- if !ok {
- return WorkerStatus{}, fmt.Errorf("invalid workerId")
- }
- return w, nil
- }
- func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
- // _, ok := b.workerStore[w.ID]
- // if ok {
- // return workerStatus{}, fmt.Errorf("duplicate worker name")
- // }
- b.workerStore[w.ID] = w
- return w, nil
- }
- func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
- id := mirrorID + "/" + workerID
- status, ok := b.statusStore[id]
- if !ok {
- return MirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
- }
- return status, nil
- }
- func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
- // if _, ok := b.workerStore[workerID]; !ok {
- // // unregistered worker
- // return MirrorStatus{}, fmt.Errorf("invalid workerID %s", workerID)
- // }
- id := mirrorID + "/" + workerID
- b.statusStore[id] = status
- return status, nil
- }
- func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]MirrorStatus, error) {
- var mirrorStatusList []MirrorStatus
- // simulating a database fail
- if workerID == _magicBadWorkerID {
- return []MirrorStatus{}, fmt.Errorf("database fail")
- }
- for k, v := range b.statusStore {
- if wID := strings.Split(k, "/")[1]; wID == workerID {
- mirrorStatusList = append(mirrorStatusList, v)
- }
- }
- return mirrorStatusList, nil
- }
- func (b *mockDBAdapter) ListAllMirrorStatus() ([]MirrorStatus, error) {
- var mirrorStatusList []MirrorStatus
- for _, v := range b.statusStore {
- mirrorStatusList = append(mirrorStatusList, v)
- }
- return mirrorStatusList, nil
- }
- func (b *mockDBAdapter) Close() error {
- return nil
- }
- func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine {
- r := gin.Default()
- r.GET("/ping", func(c *gin.Context) {
- c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
- })
- r.POST("/cmd", func(c *gin.Context) {
- var cmd WorkerCmd
- c.BindJSON(&cmd)
- cmdChan <- cmd
- })
- return r
- }
|