db.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "github.com/boltdb/bolt"
  7. . "github.com/tuna/tunasync/internal"
  8. )
  9. type dbAdapter interface {
  10. Init() error
  11. ListWorkers() ([]WorkerStatus, error)
  12. GetWorker(workerID string) (WorkerStatus, error)
  13. CreateWorker(w WorkerStatus) (WorkerStatus, error)
  14. UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
  15. GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
  16. ListMirrorStatus(workerID string) ([]MirrorStatus, error)
  17. ListAllMirrorStatus() ([]MirrorStatus, error)
  18. Close() error
  19. }
  20. func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
  21. if dbType == "bolt" {
  22. innerDB, err := bolt.Open(dbFile, 0600, nil)
  23. if err != nil {
  24. return nil, err
  25. }
  26. db := boltAdapter{
  27. db: innerDB,
  28. dbFile: dbFile,
  29. }
  30. err = db.Init()
  31. return &db, err
  32. }
  33. // unsupported db-type
  34. return nil, fmt.Errorf("unsupported db-type: %s", dbType)
  35. }
  36. const (
  37. _workerBucketKey = "workers"
  38. _statusBucketKey = "mirror_status"
  39. )
  40. type boltAdapter struct {
  41. db *bolt.DB
  42. dbFile string
  43. }
  44. func (b *boltAdapter) Init() (err error) {
  45. return b.db.Update(func(tx *bolt.Tx) error {
  46. _, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey))
  47. if err != nil {
  48. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  49. }
  50. _, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey))
  51. if err != nil {
  52. return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error())
  53. }
  54. return nil
  55. })
  56. }
  57. func (b *boltAdapter) ListWorkers() (ws []WorkerStatus, err error) {
  58. err = b.db.View(func(tx *bolt.Tx) error {
  59. bucket := tx.Bucket([]byte(_workerBucketKey))
  60. c := bucket.Cursor()
  61. var w WorkerStatus
  62. for k, v := c.First(); k != nil; k, v = c.Next() {
  63. jsonErr := json.Unmarshal(v, &w)
  64. if jsonErr != nil {
  65. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  66. continue
  67. }
  68. ws = append(ws, w)
  69. }
  70. return err
  71. })
  72. return
  73. }
  74. func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
  75. err = b.db.View(func(tx *bolt.Tx) error {
  76. bucket := tx.Bucket([]byte(_workerBucketKey))
  77. v := bucket.Get([]byte(workerID))
  78. if v == nil {
  79. return fmt.Errorf("invalid workerID %s", workerID)
  80. }
  81. err := json.Unmarshal(v, &w)
  82. return err
  83. })
  84. return
  85. }
  86. func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  87. err := b.db.Update(func(tx *bolt.Tx) error {
  88. bucket := tx.Bucket([]byte(_workerBucketKey))
  89. v, err := json.Marshal(w)
  90. if err != nil {
  91. return err
  92. }
  93. err = bucket.Put([]byte(w.ID), v)
  94. return err
  95. })
  96. return w, err
  97. }
  98. func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  99. id := mirrorID + "/" + workerID
  100. err := b.db.Update(func(tx *bolt.Tx) error {
  101. bucket := tx.Bucket([]byte(_statusBucketKey))
  102. v, err := json.Marshal(status)
  103. err = bucket.Put([]byte(id), v)
  104. return err
  105. })
  106. return status, err
  107. }
  108. func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
  109. id := mirrorID + "/" + workerID
  110. err = b.db.Update(func(tx *bolt.Tx) error {
  111. bucket := tx.Bucket([]byte(_statusBucketKey))
  112. v := bucket.Get([]byte(id))
  113. if v == nil {
  114. return fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
  115. }
  116. err := json.Unmarshal(v, &m)
  117. return err
  118. })
  119. return
  120. }
  121. func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
  122. err = b.db.View(func(tx *bolt.Tx) error {
  123. bucket := tx.Bucket([]byte(_statusBucketKey))
  124. c := bucket.Cursor()
  125. var m MirrorStatus
  126. for k, v := c.First(); k != nil; k, v = c.Next() {
  127. if wID := strings.Split(string(k), "/")[1]; wID == workerID {
  128. jsonErr := json.Unmarshal(v, &m)
  129. if jsonErr != nil {
  130. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  131. continue
  132. }
  133. ms = append(ms, m)
  134. }
  135. }
  136. return err
  137. })
  138. return
  139. }
  140. func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
  141. err = b.db.View(func(tx *bolt.Tx) error {
  142. bucket := tx.Bucket([]byte(_statusBucketKey))
  143. c := bucket.Cursor()
  144. var m MirrorStatus
  145. for k, v := c.First(); k != nil; k, v = c.Next() {
  146. jsonErr := json.Unmarshal(v, &m)
  147. if jsonErr != nil {
  148. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  149. continue
  150. }
  151. ms = append(ms, m)
  152. }
  153. return err
  154. })
  155. return
  156. }
  157. func (b *boltAdapter) Close() error {
  158. if b.db != nil {
  159. return b.db.Close()
  160. }
  161. return nil
  162. }