Parcourir la source

feature(manager): implement db.go and tests

walkerning il y a 9 ans
Parent
commit
a11fbe2c58
3 fichiers modifiés avec 237 ajouts et 13 suppressions
  1. 116 13
      manager/db.go
  2. 117 0
      manager/db_test.go
  3. 4 0
      manager/server_test.go

+ 116 - 13
manager/db.go

@@ -1,11 +1,15 @@
 package manager
 
 import (
+	"encoding/json"
 	"fmt"
+	"strings"
+
 	"github.com/boltdb/bolt"
 )
 
 type dbAdapter interface {
+	Init() error
 	ListWorkers() ([]worker, error)
 	GetWorker(workerID string) (worker, error)
 	CreateWorker(w worker) (worker, error)
@@ -26,43 +30,142 @@ func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
 			db:     innerDB,
 			dbFile: dbFile,
 		}
-		return &db, nil
+		err = db.Init()
+		return &db, err
 	}
 	// unsupported db-type
 	return nil, fmt.Errorf("unsupported db-type: %s", dbType)
 }
 
+const (
+	_workerBucketKey = "workers"
+	_statusBucketKey = "mirror_status"
+)
+
 type boltAdapter struct {
 	db     *bolt.DB
 	dbFile string
 }
 
-func (b *boltAdapter) ListWorkers() ([]worker, error) {
-	return []worker{}, nil
+func (b *boltAdapter) Init() (err error) {
+	return b.db.Update(func(tx *bolt.Tx) error {
+		_, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey))
+		if err != nil {
+			return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
+		}
+		_, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey))
+		if err != nil {
+			return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error())
+		}
+		return nil
+	})
+}
+
+func (b *boltAdapter) ListWorkers() (ws []worker, err error) {
+	err = b.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(_workerBucketKey))
+		c := bucket.Cursor()
+		var w worker
+		for k, v := c.First(); k != nil; k, v = c.Next() {
+			jsonErr := json.Unmarshal(v, &w)
+			if jsonErr != nil {
+				err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
+				continue
+			}
+			ws = append(ws, w)
+		}
+		return err
+	})
+	return
 }
 
-func (b *boltAdapter) GetWorker(workerID string) (worker, error) {
-	return worker{}, nil
+func (b *boltAdapter) GetWorker(workerID string) (w worker, err error) {
+	err = b.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(_workerBucketKey))
+		v := bucket.Get([]byte(workerID))
+		if v == nil {
+			return fmt.Errorf("invalid workerID %s", workerID)
+		}
+		err := json.Unmarshal(v, &w)
+		return err
+	})
+	return
 }
 
 func (b *boltAdapter) CreateWorker(w worker) (worker, error) {
-	return worker{}, nil
+	err := b.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(_workerBucketKey))
+		v, err := json.Marshal(w)
+		if err != nil {
+			return err
+		}
+		err = bucket.Put([]byte(w.ID), v)
+		return err
+	})
+	return w, err
 }
 
 func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error) {
-	return mirrorStatus{}, nil
+	id := mirrorID + "/" + workerID
+	err := b.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(_statusBucketKey))
+		v, err := json.Marshal(status)
+		err = bucket.Put([]byte(id), v)
+		return err
+	})
+	return status, err
 }
 
-func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (mirrorStatus, error) {
-	return mirrorStatus{}, nil
+func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m mirrorStatus, err error) {
+	id := mirrorID + "/" + workerID
+	err = b.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(_statusBucketKey))
+		v := bucket.Get([]byte(id))
+		if v == nil {
+			return fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
+		}
+		err := json.Unmarshal(v, &m)
+		return err
+	})
+	return
 }
 
-func (b *boltAdapter) ListMirrorStatus(workerID string) ([]mirrorStatus, error) {
-	return []mirrorStatus{}, nil
+func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []mirrorStatus, err error) {
+	err = b.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(_statusBucketKey))
+		c := bucket.Cursor()
+		var m mirrorStatus
+		for k, v := c.First(); k != nil; k, v = c.Next() {
+			if wID := strings.Split(string(k), "/")[1]; wID == workerID {
+				jsonErr := json.Unmarshal(v, &m)
+				if jsonErr != nil {
+					err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
+					continue
+				}
+				ms = append(ms, m)
+			}
+		}
+		return err
+	})
+	return
 }
 
-func (b *boltAdapter) ListAllMirrorStatus() ([]mirrorStatus, error) {
-	return []mirrorStatus{}, nil
+func (b *boltAdapter) ListAllMirrorStatus() (ms []mirrorStatus, err error) {
+	err = b.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(_statusBucketKey))
+		c := bucket.Cursor()
+		var m mirrorStatus
+		for k, v := c.First(); k != nil; k, v = c.Next() {
+			jsonErr := json.Unmarshal(v, &m)
+			if jsonErr != nil {
+				err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
+				continue
+			}
+			ms = append(ms, m)
+		}
+		return err
+	})
+	return
 }
 
 func (b *boltAdapter) Close() error {

+ 117 - 0
manager/db_test.go

@@ -0,0 +1,117 @@
+package manager
+
+import (
+	"encoding/json"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"testing"
+	"time"
+
+	. "github.com/smartystreets/goconvey/convey"
+	. "github.com/tuna/tunasync/internal"
+)
+
+func TestBoltAdapter(t *testing.T) {
+	Convey("boltAdapter should work", t, func() {
+		tmpDir, err := ioutil.TempDir("", "tunasync")
+		defer os.RemoveAll(tmpDir)
+		So(err, ShouldBeNil)
+
+		dbType, dbFile := "bolt", filepath.Join(tmpDir, "bolt.db")
+		boltDB, err := makeDBAdapter(dbType, dbFile)
+		So(err, ShouldBeNil)
+
+		defer func() {
+			// close boltDB
+			err := boltDB.Close()
+			So(err, ShouldBeNil)
+		}()
+
+		testWorkerIDs := []string{"test_worker1", "test_worker2"}
+		Convey("create worker", func() {
+			for _, id := range testWorkerIDs {
+				w := worker{
+					ID:         id,
+					Token:      "token_" + id,
+					LastOnline: time.Now(),
+				}
+				w, err = boltDB.CreateWorker(w)
+				So(err, ShouldBeNil)
+			}
+
+			Convey("get exists worker", func() {
+				_, err := boltDB.GetWorker(testWorkerIDs[0])
+				So(err, ShouldBeNil)
+			})
+
+			Convey("list exist worker", func() {
+				ws, err := boltDB.ListWorkers()
+				So(err, ShouldBeNil)
+				So(len(ws), ShouldEqual, 2)
+			})
+
+			Convey("get inexist worker", func() {
+				_, err := boltDB.GetWorker("invalid workerID")
+				So(err, ShouldNotBeNil)
+			})
+		})
+
+		Convey("update mirror status", func() {
+			status1 := mirrorStatus{
+				Name:       "arch-sync1",
+				Worker:     testWorkerIDs[0],
+				IsMaster:   true,
+				Status:     Success,
+				LastUpdate: time.Now(),
+				Upstream:   "mirrors.tuna.tsinghua.edu.cn",
+				Size:       "3GB",
+			}
+			status2 := mirrorStatus{
+				Name:       "arch-sync2",
+				Worker:     testWorkerIDs[1],
+				IsMaster:   true,
+				Status:     Success,
+				LastUpdate: time.Now(),
+				Upstream:   "mirrors.tuna.tsinghua.edu.cn",
+				Size:       "4GB",
+			}
+
+			_, err := boltDB.UpdateMirrorStatus(status1.Worker, status1.Name, status1)
+			_, err = boltDB.UpdateMirrorStatus(status2.Worker, status2.Name, status2)
+			So(err, ShouldBeNil)
+
+			Convey("get mirror status", func() {
+				m, err := boltDB.GetMirrorStatus(testWorkerIDs[0], status1.Name)
+				So(err, ShouldBeNil)
+				expectedJSON, err := json.Marshal(status1)
+				So(err, ShouldBeNil)
+				actualJSON, err := json.Marshal(m)
+				So(err, ShouldBeNil)
+				So(string(actualJSON), ShouldEqual, string(expectedJSON))
+			})
+
+			Convey("list mirror status", func() {
+				ms, err := boltDB.ListMirrorStatus(testWorkerIDs[0])
+				So(err, ShouldBeNil)
+				expectedJSON, err := json.Marshal([]mirrorStatus{status1})
+				So(err, ShouldBeNil)
+				actualJSON, err := json.Marshal(ms)
+				So(err, ShouldBeNil)
+				So(string(actualJSON), ShouldEqual, string(expectedJSON))
+			})
+
+			Convey("list all mirror status", func() {
+				ms, err := boltDB.ListAllMirrorStatus()
+				So(err, ShouldBeNil)
+				expectedJSON, err := json.Marshal([]mirrorStatus{status1, status2})
+				So(err, ShouldBeNil)
+				actualJSON, err := json.Marshal(ms)
+				So(err, ShouldBeNil)
+				So(string(actualJSON), ShouldEqual, string(expectedJSON))
+			})
+
+		})
+
+	})
+}

+ 4 - 0
manager/server_test.go

@@ -157,6 +157,10 @@ type mockDBAdapter struct {
 	statusStore map[string]mirrorStatus
 }
 
+func (b *mockDBAdapter) Init() error {
+	return nil
+}
+
 func (b *mockDBAdapter) ListWorkers() ([]worker, error) {
 	workers := make([]worker, len(b.workerStore))
 	idx := 0