db.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "github.com/boltdb/bolt"
  7. )
  8. type dbAdapter interface {
  9. Init() error
  10. ListWorkers() ([]worker, error)
  11. GetWorker(workerID string) (worker, error)
  12. CreateWorker(w worker) (worker, error)
  13. UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error)
  14. GetMirrorStatus(workerID, mirrorID string) (mirrorStatus, error)
  15. ListMirrorStatus(workerID string) ([]mirrorStatus, error)
  16. ListAllMirrorStatus() ([]mirrorStatus, error)
  17. Close() error
  18. }
  19. func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
  20. if dbType == "bolt" {
  21. innerDB, err := bolt.Open(dbFile, 0600, nil)
  22. if err != nil {
  23. return nil, err
  24. }
  25. db := boltAdapter{
  26. db: innerDB,
  27. dbFile: dbFile,
  28. }
  29. err = db.Init()
  30. return &db, err
  31. }
  32. // unsupported db-type
  33. return nil, fmt.Errorf("unsupported db-type: %s", dbType)
  34. }
  35. const (
  36. _workerBucketKey = "workers"
  37. _statusBucketKey = "mirror_status"
  38. )
  39. type boltAdapter struct {
  40. db *bolt.DB
  41. dbFile string
  42. }
  43. func (b *boltAdapter) Init() (err error) {
  44. return b.db.Update(func(tx *bolt.Tx) error {
  45. _, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey))
  46. if err != nil {
  47. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  48. }
  49. _, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey))
  50. if err != nil {
  51. return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error())
  52. }
  53. return nil
  54. })
  55. }
  56. func (b *boltAdapter) ListWorkers() (ws []worker, err error) {
  57. err = b.db.View(func(tx *bolt.Tx) error {
  58. bucket := tx.Bucket([]byte(_workerBucketKey))
  59. c := bucket.Cursor()
  60. var w worker
  61. for k, v := c.First(); k != nil; k, v = c.Next() {
  62. jsonErr := json.Unmarshal(v, &w)
  63. if jsonErr != nil {
  64. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  65. continue
  66. }
  67. ws = append(ws, w)
  68. }
  69. return err
  70. })
  71. return
  72. }
  73. func (b *boltAdapter) GetWorker(workerID string) (w worker, err error) {
  74. err = b.db.View(func(tx *bolt.Tx) error {
  75. bucket := tx.Bucket([]byte(_workerBucketKey))
  76. v := bucket.Get([]byte(workerID))
  77. if v == nil {
  78. return fmt.Errorf("invalid workerID %s", workerID)
  79. }
  80. err := json.Unmarshal(v, &w)
  81. return err
  82. })
  83. return
  84. }
  85. func (b *boltAdapter) CreateWorker(w worker) (worker, error) {
  86. err := b.db.Update(func(tx *bolt.Tx) error {
  87. bucket := tx.Bucket([]byte(_workerBucketKey))
  88. v, err := json.Marshal(w)
  89. if err != nil {
  90. return err
  91. }
  92. err = bucket.Put([]byte(w.ID), v)
  93. return err
  94. })
  95. return w, err
  96. }
  97. func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error) {
  98. id := mirrorID + "/" + workerID
  99. err := b.db.Update(func(tx *bolt.Tx) error {
  100. bucket := tx.Bucket([]byte(_statusBucketKey))
  101. v, err := json.Marshal(status)
  102. err = bucket.Put([]byte(id), v)
  103. return err
  104. })
  105. return status, err
  106. }
  107. func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m mirrorStatus, err error) {
  108. id := mirrorID + "/" + workerID
  109. err = b.db.Update(func(tx *bolt.Tx) error {
  110. bucket := tx.Bucket([]byte(_statusBucketKey))
  111. v := bucket.Get([]byte(id))
  112. if v == nil {
  113. return fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
  114. }
  115. err := json.Unmarshal(v, &m)
  116. return err
  117. })
  118. return
  119. }
  120. func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []mirrorStatus, err error) {
  121. err = b.db.View(func(tx *bolt.Tx) error {
  122. bucket := tx.Bucket([]byte(_statusBucketKey))
  123. c := bucket.Cursor()
  124. var m mirrorStatus
  125. for k, v := c.First(); k != nil; k, v = c.Next() {
  126. if wID := strings.Split(string(k), "/")[1]; wID == workerID {
  127. jsonErr := json.Unmarshal(v, &m)
  128. if jsonErr != nil {
  129. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  130. continue
  131. }
  132. ms = append(ms, m)
  133. }
  134. }
  135. return err
  136. })
  137. return
  138. }
  139. func (b *boltAdapter) ListAllMirrorStatus() (ms []mirrorStatus, err error) {
  140. err = b.db.View(func(tx *bolt.Tx) error {
  141. bucket := tx.Bucket([]byte(_statusBucketKey))
  142. c := bucket.Cursor()
  143. var m mirrorStatus
  144. for k, v := c.First(); k != nil; k, v = c.Next() {
  145. jsonErr := json.Unmarshal(v, &m)
  146. if jsonErr != nil {
  147. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  148. continue
  149. }
  150. ms = append(ms, m)
  151. }
  152. return err
  153. })
  154. return
  155. }
  156. func (b *boltAdapter) Close() error {
  157. if b.db != nil {
  158. return b.db.Close()
  159. }
  160. return nil
  161. }