123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527 |
- package manager
- import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "math/rand"
- "net/http"
- "strings"
- "sync/atomic"
- "testing"
- "time"
- "github.com/gin-gonic/gin"
- . "github.com/smartystreets/goconvey/convey"
- . "github.com/tuna/tunasync/internal"
- )
- const (
- _magicBadWorkerID = "magic_bad_worker_id"
- )
- func TestHTTPServer(t *testing.T) {
- var listenPort = 5000
- Convey("HTTP server should work", t, func(ctx C) {
- listenPort++
- port := listenPort
- addr := "127.0.0.1"
- baseURL := fmt.Sprintf("http://%s:%d", addr, port)
- InitLogger(true, true, false)
- s := GetTUNASyncManager(&Config{Debug: true})
- s.cfg.Server.Addr = addr
- s.cfg.Server.Port = port
- So(s, ShouldNotBeNil)
- s.setDBAdapter(&mockDBAdapter{
- workerStore: map[string]WorkerStatus{
- _magicBadWorkerID: WorkerStatus{
- ID: _magicBadWorkerID,
- }},
- statusStore: make(map[string]MirrorStatus),
- })
- go s.Run()
- time.Sleep(50 * time.Millisecond)
- resp, err := http.Get(baseURL + "/ping")
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- So(resp.Header.Get("Content-Type"), ShouldEqual, "application/json; charset=utf-8")
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- So(err, ShouldBeNil)
- var p map[string]string
- err = json.Unmarshal(body, &p)
- So(err, ShouldBeNil)
- So(p[_infoKey], ShouldEqual, "pong")
- Convey("when database fail", func(ctx C) {
- resp, err := http.Get(fmt.Sprintf("%s/workers/%s/jobs", baseURL, _magicBadWorkerID))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
- defer resp.Body.Close()
- var msg map[string]string
- err = json.NewDecoder(resp.Body).Decode(&msg)
- So(err, ShouldBeNil)
- So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
- })
- Convey("when register multiple workers", func(ctx C) {
- N := 10
- var cnt uint32
- for i := 0; i < N; i++ {
- go func(id int) {
- w := WorkerStatus{
- ID: fmt.Sprintf("worker%d", id),
- }
- resp, err := PostJSON(baseURL+"/workers", w, nil)
- ctx.So(err, ShouldBeNil)
- ctx.So(resp.StatusCode, ShouldEqual, http.StatusOK)
- atomic.AddUint32(&cnt, 1)
- }(i)
- }
- time.Sleep(2 * time.Second)
- So(cnt, ShouldEqual, N)
- Convey("list all workers", func(ctx C) {
- resp, err := http.Get(baseURL + "/workers")
- So(err, ShouldBeNil)
- defer resp.Body.Close()
- var actualResponseObj []WorkerStatus
- err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
- So(err, ShouldBeNil)
- So(len(actualResponseObj), ShouldEqual, N+1)
- })
- })
- Convey("when register a worker", func(ctx C) {
- w := WorkerStatus{
- ID: "test_worker1",
- }
- resp, err := PostJSON(baseURL+"/workers", w, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- Convey("list all workers", func(ctx C) {
- resp, err := http.Get(baseURL + "/workers")
- So(err, ShouldBeNil)
- defer resp.Body.Close()
- var actualResponseObj []WorkerStatus
- err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
- So(err, ShouldBeNil)
- So(len(actualResponseObj), ShouldEqual, 2)
- })
- Convey("delete an existent worker", func(ctx C) {
- req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil)
- So(err, ShouldBeNil)
- clt := &http.Client{}
- resp, err := clt.Do(req)
- So(err, ShouldBeNil)
- defer resp.Body.Close()
- res := map[string]string{}
- err = json.NewDecoder(resp.Body).Decode(&res)
- So(err, ShouldBeNil)
- So(res[_infoKey], ShouldEqual, "deleted")
- })
- Convey("delete non-existent worker", func(ctx C) {
- invalidWorker := "test_worker233"
- req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil)
- So(err, ShouldBeNil)
- clt := &http.Client{}
- resp, err := clt.Do(req)
- So(err, ShouldBeNil)
- defer resp.Body.Close()
- res := map[string]string{}
- err = json.NewDecoder(resp.Body).Decode(&res)
- So(err, ShouldBeNil)
- So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
- })
- Convey("flush disabled jobs", func(ctx C) {
- req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
- So(err, ShouldBeNil)
- clt := &http.Client{}
- resp, err := clt.Do(req)
- So(err, ShouldBeNil)
- defer resp.Body.Close()
- res := map[string]string{}
- err = json.NewDecoder(resp.Body).Decode(&res)
- So(err, ShouldBeNil)
- So(res[_infoKey], ShouldEqual, "flushed")
- })
- Convey("update mirror status of a existed worker", func(ctx C) {
- status := MirrorStatus{
- Name: "arch-sync1",
- Worker: "test_worker1",
- IsMaster: true,
- Status: Success,
- Upstream: "mirrors.tuna.tsinghua.edu.cn",
- Size: "unknown",
- }
- resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
- So(err, ShouldBeNil)
- defer resp.Body.Close()
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- Convey("list mirror status of an existed worker", func(ctx C) {
- var ms []MirrorStatus
- resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
- m := ms[0]
- So(m.Name, ShouldEqual, status.Name)
- So(m.Worker, ShouldEqual, status.Worker)
- So(m.Status, ShouldEqual, status.Status)
- So(m.Upstream, ShouldEqual, status.Upstream)
- So(m.Size, ShouldEqual, status.Size)
- So(m.IsMaster, ShouldEqual, status.IsMaster)
- So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
- So(m.LastStarted.IsZero(), ShouldBeTrue) // hasn't been initialized yet
- So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
- })
- // start syncing
- status.Status = PreSyncing
- time.Sleep(1 * time.Second)
- resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
- So(err, ShouldBeNil)
- defer resp.Body.Close()
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- Convey("update mirror status to PreSync - starting sync", func(ctx C) {
- var ms []MirrorStatus
- resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
- m := ms[0]
- So(m.Name, ShouldEqual, status.Name)
- So(m.Worker, ShouldEqual, status.Worker)
- So(m.Status, ShouldEqual, status.Status)
- So(m.Upstream, ShouldEqual, status.Upstream)
- So(m.Size, ShouldEqual, status.Size)
- So(m.IsMaster, ShouldEqual, status.IsMaster)
- So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 3*time.Second)
- So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 1*time.Second)
- So(time.Now().Sub(m.LastStarted), ShouldBeLessThan, 2*time.Second)
- So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 3*time.Second)
- So(time.Now().Sub(m.LastEnded), ShouldBeGreaterThan, 1*time.Second)
- })
- Convey("list all job status of all workers", func(ctx C) {
- var ms []WebMirrorStatus
- resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- m := ms[0]
- So(m.Name, ShouldEqual, status.Name)
- So(m.Status, ShouldEqual, status.Status)
- So(m.Upstream, ShouldEqual, status.Upstream)
- So(m.Size, ShouldEqual, status.Size)
- So(m.IsMaster, ShouldEqual, status.IsMaster)
- So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 3*time.Second)
- So(time.Now().Sub(m.LastStarted.Time), ShouldBeLessThan, 2*time.Second)
- So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 3*time.Second)
- })
- Convey("Update size of a valid mirror", func(ctx C) {
- msg := struct {
- Name string `json:"name"`
- Size string `json:"size"`
- }{status.Name, "5GB"}
- url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
- resp, err := PostJSON(url, msg, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- Convey("Get new size of a mirror", func(ctx C) {
- var ms []MirrorStatus
- resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
- m := ms[0]
- So(m.Name, ShouldEqual, status.Name)
- So(m.Worker, ShouldEqual, status.Worker)
- So(m.Status, ShouldEqual, status.Status)
- So(m.Upstream, ShouldEqual, status.Upstream)
- So(m.Size, ShouldEqual, "5GB")
- So(m.IsMaster, ShouldEqual, status.IsMaster)
- So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 3*time.Second)
- So(time.Now().Sub(m.LastStarted), ShouldBeLessThan, 2*time.Second)
- So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 3*time.Second)
- })
- })
- Convey("Update schedule of valid mirrors", func(ctx C) {
- msg := MirrorSchedules{
- []MirrorSchedule{
- MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)},
- MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)},
- },
- }
- url := fmt.Sprintf("%s/workers/%s/schedules", baseURL, status.Worker)
- resp, err := PostJSON(url, msg, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- })
- Convey("Update size of an invalid mirror", func(ctx C) {
- msg := struct {
- Name string `json:"name"`
- Size string `json:"size"`
- }{"Invalid mirror", "5GB"}
- url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
- resp, err := PostJSON(url, msg, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
- })
- // what if status changed to failed
- status.Status = Failed
- time.Sleep(3 * time.Second)
- resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
- So(err, ShouldBeNil)
- defer resp.Body.Close()
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- Convey("What if syncing job failed", func(ctx C) {
- var ms []MirrorStatus
- resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- // err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
- m := ms[0]
- So(m.Name, ShouldEqual, status.Name)
- So(m.Worker, ShouldEqual, status.Worker)
- So(m.Status, ShouldEqual, status.Status)
- So(m.Upstream, ShouldEqual, status.Upstream)
- So(m.Size, ShouldEqual, status.Size)
- So(m.IsMaster, ShouldEqual, status.IsMaster)
- So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
- So(time.Now().Sub(m.LastStarted), ShouldBeGreaterThan, 3*time.Second)
- So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
- })
- })
- Convey("update mirror status of an inexisted worker", func(ctx C) {
- invalidWorker := "test_worker2"
- status := MirrorStatus{
- Name: "arch-sync2",
- Worker: invalidWorker,
- IsMaster: true,
- Status: Success,
- LastUpdate: time.Now(),
- LastStarted: time.Now(),
- LastEnded: time.Now(),
- Upstream: "mirrors.tuna.tsinghua.edu.cn",
- Size: "4GB",
- }
- resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
- baseURL, status.Worker, status.Name), status, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
- defer resp.Body.Close()
- var msg map[string]string
- err = json.NewDecoder(resp.Body).Decode(&msg)
- So(err, ShouldBeNil)
- So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
- })
- Convey("update schedule of an non-existent worker", func(ctx C) {
- invalidWorker := "test_worker2"
- sch := MirrorSchedules{
- []MirrorSchedule{
- MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)},
- MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)},
- },
- }
- resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/schedules",
- baseURL, invalidWorker), sch, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
- defer resp.Body.Close()
- var msg map[string]string
- err = json.NewDecoder(resp.Body).Decode(&msg)
- So(err, ShouldBeNil)
- So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
- })
- Convey("handle client command", func(ctx C) {
- cmdChan := make(chan WorkerCmd, 1)
- workerServer := makeMockWorkerServer(cmdChan)
- workerPort := rand.Intn(10000) + 30000
- bindAddress := fmt.Sprintf("127.0.0.1:%d", workerPort)
- workerBaseURL := fmt.Sprintf("http://%s", bindAddress)
- w := WorkerStatus{
- ID: "test_worker_cmd",
- URL: workerBaseURL + "/cmd",
- }
- resp, err := PostJSON(baseURL+"/workers", w, nil)
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- go func() {
- // run the mock worker server
- workerServer.Run(bindAddress)
- }()
- time.Sleep(50 * time.Millisecond)
- // verify the worker mock server is running
- workerResp, err := http.Get(workerBaseURL + "/ping")
- So(err, ShouldBeNil)
- defer workerResp.Body.Close()
- So(workerResp.StatusCode, ShouldEqual, http.StatusOK)
- Convey("when client send wrong cmd", func(ctx C) {
- clientCmd := ClientCmd{
- Cmd: CmdStart,
- MirrorID: "ubuntu-sync",
- WorkerID: "not_exist_worker",
- }
- resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
- So(err, ShouldBeNil)
- defer resp.Body.Close()
- So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
- })
- Convey("when client send correct cmd", func(ctx C) {
- clientCmd := ClientCmd{
- Cmd: CmdStart,
- MirrorID: "ubuntu-sync",
- WorkerID: w.ID,
- }
- resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
- So(err, ShouldBeNil)
- defer resp.Body.Close()
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- time.Sleep(50 * time.Microsecond)
- select {
- case cmd := <-cmdChan:
- ctx.So(cmd.Cmd, ShouldEqual, clientCmd.Cmd)
- ctx.So(cmd.MirrorID, ShouldEqual, clientCmd.MirrorID)
- default:
- ctx.So(0, ShouldEqual, 1)
- }
- })
- })
- })
- })
- }
- type mockDBAdapter struct {
- workerStore map[string]WorkerStatus
- statusStore map[string]MirrorStatus
- }
- func (b *mockDBAdapter) Init() error {
- return nil
- }
- func (b *mockDBAdapter) ListWorkers() ([]WorkerStatus, error) {
- workers := make([]WorkerStatus, len(b.workerStore))
- idx := 0
- for _, w := range b.workerStore {
- workers[idx] = w
- idx++
- }
- return workers, nil
- }
- func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
- w, ok := b.workerStore[workerID]
- if !ok {
- return WorkerStatus{}, fmt.Errorf("invalid workerId")
- }
- return w, nil
- }
- func (b *mockDBAdapter) DeleteWorker(workerID string) error {
- delete(b.workerStore, workerID)
- return nil
- }
- func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
- // _, ok := b.workerStore[w.ID]
- // if ok {
- // return workerStatus{}, fmt.Errorf("duplicate worker name")
- // }
- b.workerStore[w.ID] = w
- return w, nil
- }
- func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
- id := mirrorID + "/" + workerID
- status, ok := b.statusStore[id]
- if !ok {
- return MirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
- }
- return status, nil
- }
- func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
- // if _, ok := b.workerStore[workerID]; !ok {
- // // unregistered worker
- // return MirrorStatus{}, fmt.Errorf("invalid workerID %s", workerID)
- // }
- id := mirrorID + "/" + workerID
- b.statusStore[id] = status
- return status, nil
- }
- func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]MirrorStatus, error) {
- var mirrorStatusList []MirrorStatus
- // simulating a database fail
- if workerID == _magicBadWorkerID {
- return []MirrorStatus{}, fmt.Errorf("database fail")
- }
- for k, v := range b.statusStore {
- if wID := strings.Split(k, "/")[1]; wID == workerID {
- mirrorStatusList = append(mirrorStatusList, v)
- }
- }
- return mirrorStatusList, nil
- }
- func (b *mockDBAdapter) ListAllMirrorStatus() ([]MirrorStatus, error) {
- var mirrorStatusList []MirrorStatus
- for _, v := range b.statusStore {
- mirrorStatusList = append(mirrorStatusList, v)
- }
- return mirrorStatusList, nil
- }
- func (b *mockDBAdapter) Close() error {
- return nil
- }
- func (b *mockDBAdapter) FlushDisabledJobs() error {
- return nil
- }
- func makeMockWorkerServer(cmdChan chan WorkerCmd) *gin.Engine {
- r := gin.Default()
- r.GET("/ping", func(c *gin.Context) {
- c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
- })
- r.POST("/cmd", func(c *gin.Context) {
- var cmd WorkerCmd
- c.BindJSON(&cmd)
- cmdChan <- cmd
- })
- return r
- }
|