db.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "github.com/boltdb/bolt"
  7. . "github.com/tuna/tunasync/internal"
  8. )
  9. type dbAdapter interface {
  10. Init() error
  11. ListWorkers() ([]WorkerStatus, error)
  12. GetWorker(workerID string) (WorkerStatus, error)
  13. CreateWorker(w WorkerStatus) (WorkerStatus, error)
  14. UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
  15. GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
  16. ListMirrorStatus(workerID string) ([]MirrorStatus, error)
  17. ListAllMirrorStatus() ([]MirrorStatus, error)
  18. FlushDisabledJobs() error
  19. Close() error
  20. }
  21. func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
  22. if dbType == "bolt" {
  23. innerDB, err := bolt.Open(dbFile, 0600, nil)
  24. if err != nil {
  25. return nil, err
  26. }
  27. db := boltAdapter{
  28. db: innerDB,
  29. dbFile: dbFile,
  30. }
  31. err = db.Init()
  32. return &db, err
  33. }
  34. // unsupported db-type
  35. return nil, fmt.Errorf("unsupported db-type: %s", dbType)
  36. }
  37. const (
  38. _workerBucketKey = "workers"
  39. _statusBucketKey = "mirror_status"
  40. )
  41. type boltAdapter struct {
  42. db *bolt.DB
  43. dbFile string
  44. }
  45. func (b *boltAdapter) Init() (err error) {
  46. return b.db.Update(func(tx *bolt.Tx) error {
  47. _, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey))
  48. if err != nil {
  49. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  50. }
  51. _, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey))
  52. if err != nil {
  53. return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error())
  54. }
  55. return nil
  56. })
  57. }
  58. func (b *boltAdapter) ListWorkers() (ws []WorkerStatus, err error) {
  59. err = b.db.View(func(tx *bolt.Tx) error {
  60. bucket := tx.Bucket([]byte(_workerBucketKey))
  61. c := bucket.Cursor()
  62. var w WorkerStatus
  63. for k, v := c.First(); k != nil; k, v = c.Next() {
  64. jsonErr := json.Unmarshal(v, &w)
  65. if jsonErr != nil {
  66. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  67. continue
  68. }
  69. ws = append(ws, w)
  70. }
  71. return err
  72. })
  73. return
  74. }
  75. func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
  76. err = b.db.View(func(tx *bolt.Tx) error {
  77. bucket := tx.Bucket([]byte(_workerBucketKey))
  78. v := bucket.Get([]byte(workerID))
  79. if v == nil {
  80. return fmt.Errorf("invalid workerID %s", workerID)
  81. }
  82. err := json.Unmarshal(v, &w)
  83. return err
  84. })
  85. return
  86. }
  87. func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  88. err := b.db.Update(func(tx *bolt.Tx) error {
  89. bucket := tx.Bucket([]byte(_workerBucketKey))
  90. v, err := json.Marshal(w)
  91. if err != nil {
  92. return err
  93. }
  94. err = bucket.Put([]byte(w.ID), v)
  95. return err
  96. })
  97. return w, err
  98. }
  99. func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  100. id := mirrorID + "/" + workerID
  101. err := b.db.Update(func(tx *bolt.Tx) error {
  102. bucket := tx.Bucket([]byte(_statusBucketKey))
  103. v, err := json.Marshal(status)
  104. err = bucket.Put([]byte(id), v)
  105. return err
  106. })
  107. return status, err
  108. }
  109. func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
  110. id := mirrorID + "/" + workerID
  111. err = b.db.Update(func(tx *bolt.Tx) error {
  112. bucket := tx.Bucket([]byte(_statusBucketKey))
  113. v := bucket.Get([]byte(id))
  114. if v == nil {
  115. return fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
  116. }
  117. err := json.Unmarshal(v, &m)
  118. return err
  119. })
  120. return
  121. }
  122. func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
  123. err = b.db.View(func(tx *bolt.Tx) error {
  124. bucket := tx.Bucket([]byte(_statusBucketKey))
  125. c := bucket.Cursor()
  126. var m MirrorStatus
  127. for k, v := c.First(); k != nil; k, v = c.Next() {
  128. if wID := strings.Split(string(k), "/")[1]; wID == workerID {
  129. jsonErr := json.Unmarshal(v, &m)
  130. if jsonErr != nil {
  131. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  132. continue
  133. }
  134. ms = append(ms, m)
  135. }
  136. }
  137. return err
  138. })
  139. return
  140. }
  141. func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
  142. err = b.db.View(func(tx *bolt.Tx) error {
  143. bucket := tx.Bucket([]byte(_statusBucketKey))
  144. c := bucket.Cursor()
  145. var m MirrorStatus
  146. for k, v := c.First(); k != nil; k, v = c.Next() {
  147. jsonErr := json.Unmarshal(v, &m)
  148. if jsonErr != nil {
  149. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  150. continue
  151. }
  152. ms = append(ms, m)
  153. }
  154. return err
  155. })
  156. return
  157. }
  158. func (b *boltAdapter) FlushDisabledJobs() (err error) {
  159. err = b.db.Update(func(tx *bolt.Tx) error {
  160. bucket := tx.Bucket([]byte(_statusBucketKey))
  161. c := bucket.Cursor()
  162. var m MirrorStatus
  163. for k, v := c.First(); k != nil; k, v = c.Next() {
  164. jsonErr := json.Unmarshal(v, &m)
  165. if jsonErr != nil {
  166. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  167. continue
  168. }
  169. if m.Status == Disabled || len(m.Name) == 0 {
  170. err = c.Delete()
  171. }
  172. }
  173. return err
  174. })
  175. return
  176. }
  177. func (b *boltAdapter) Close() error {
  178. if b.db != nil {
  179. return b.db.Close()
  180. }
  181. return nil
  182. }