2
0

db.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "github.com/boltdb/bolt"
  7. . "github.com/tuna/tunasync/internal"
  8. )
  9. type dbAdapter interface {
  10. Init() error
  11. ListWorkers() ([]WorkerStatus, error)
  12. GetWorker(workerID string) (WorkerStatus, error)
  13. DeleteWorker(workerID string) error
  14. CreateWorker(w WorkerStatus) (WorkerStatus, error)
  15. UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
  16. GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
  17. ListMirrorStatus(workerID string) ([]MirrorStatus, error)
  18. ListAllMirrorStatus() ([]MirrorStatus, error)
  19. FlushDisabledJobs() error
  20. Close() error
  21. }
  22. func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
  23. if dbType == "bolt" {
  24. innerDB, err := bolt.Open(dbFile, 0600, nil)
  25. if err != nil {
  26. return nil, err
  27. }
  28. db := boltAdapter{
  29. db: innerDB,
  30. dbFile: dbFile,
  31. }
  32. err = db.Init()
  33. return &db, err
  34. }
  35. // unsupported db-type
  36. return nil, fmt.Errorf("unsupported db-type: %s", dbType)
  37. }
  38. const (
  39. _workerBucketKey = "workers"
  40. _statusBucketKey = "mirror_status"
  41. )
  42. type boltAdapter struct {
  43. db *bolt.DB
  44. dbFile string
  45. }
  46. func (b *boltAdapter) Init() (err error) {
  47. return b.db.Update(func(tx *bolt.Tx) error {
  48. _, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey))
  49. if err != nil {
  50. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  51. }
  52. _, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey))
  53. if err != nil {
  54. return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error())
  55. }
  56. return nil
  57. })
  58. }
  59. func (b *boltAdapter) ListWorkers() (ws []WorkerStatus, err error) {
  60. err = b.db.View(func(tx *bolt.Tx) error {
  61. bucket := tx.Bucket([]byte(_workerBucketKey))
  62. c := bucket.Cursor()
  63. var w WorkerStatus
  64. for k, v := c.First(); k != nil; k, v = c.Next() {
  65. jsonErr := json.Unmarshal(v, &w)
  66. if jsonErr != nil {
  67. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  68. continue
  69. }
  70. ws = append(ws, w)
  71. }
  72. return err
  73. })
  74. return
  75. }
  76. func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
  77. err = b.db.View(func(tx *bolt.Tx) error {
  78. bucket := tx.Bucket([]byte(_workerBucketKey))
  79. v := bucket.Get([]byte(workerID))
  80. if v == nil {
  81. return fmt.Errorf("invalid workerID %s", workerID)
  82. }
  83. err := json.Unmarshal(v, &w)
  84. return err
  85. })
  86. return
  87. }
  88. func (b *boltAdapter) DeleteWorker(workerID string) (err error) {
  89. err = b.db.Update(func(tx *bolt.Tx) error {
  90. bucket := tx.Bucket([]byte(_workerBucketKey))
  91. v := bucket.Get([]byte(workerID))
  92. if v == nil {
  93. return fmt.Errorf("invalid workerID %s", workerID)
  94. }
  95. err := bucket.Delete([]byte(workerID))
  96. return err
  97. })
  98. return
  99. }
  100. func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  101. err := b.db.Update(func(tx *bolt.Tx) error {
  102. bucket := tx.Bucket([]byte(_workerBucketKey))
  103. v, err := json.Marshal(w)
  104. if err != nil {
  105. return err
  106. }
  107. err = bucket.Put([]byte(w.ID), v)
  108. return err
  109. })
  110. return w, err
  111. }
  112. func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  113. id := mirrorID + "/" + workerID
  114. err := b.db.Update(func(tx *bolt.Tx) error {
  115. bucket := tx.Bucket([]byte(_statusBucketKey))
  116. v, err := json.Marshal(status)
  117. err = bucket.Put([]byte(id), v)
  118. return err
  119. })
  120. return status, err
  121. }
  122. func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
  123. id := mirrorID + "/" + workerID
  124. err = b.db.Update(func(tx *bolt.Tx) error {
  125. bucket := tx.Bucket([]byte(_statusBucketKey))
  126. v := bucket.Get([]byte(id))
  127. if v == nil {
  128. return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
  129. }
  130. err := json.Unmarshal(v, &m)
  131. return err
  132. })
  133. return
  134. }
  135. func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
  136. err = b.db.View(func(tx *bolt.Tx) error {
  137. bucket := tx.Bucket([]byte(_statusBucketKey))
  138. c := bucket.Cursor()
  139. var m MirrorStatus
  140. for k, v := c.First(); k != nil; k, v = c.Next() {
  141. if wID := strings.Split(string(k), "/")[1]; wID == workerID {
  142. jsonErr := json.Unmarshal(v, &m)
  143. if jsonErr != nil {
  144. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  145. continue
  146. }
  147. ms = append(ms, m)
  148. }
  149. }
  150. return err
  151. })
  152. return
  153. }
  154. func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
  155. err = b.db.View(func(tx *bolt.Tx) error {
  156. bucket := tx.Bucket([]byte(_statusBucketKey))
  157. c := bucket.Cursor()
  158. var m MirrorStatus
  159. for k, v := c.First(); k != nil; k, v = c.Next() {
  160. jsonErr := json.Unmarshal(v, &m)
  161. if jsonErr != nil {
  162. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  163. continue
  164. }
  165. ms = append(ms, m)
  166. }
  167. return err
  168. })
  169. return
  170. }
  171. func (b *boltAdapter) FlushDisabledJobs() (err error) {
  172. err = b.db.Update(func(tx *bolt.Tx) error {
  173. bucket := tx.Bucket([]byte(_statusBucketKey))
  174. c := bucket.Cursor()
  175. var m MirrorStatus
  176. for k, v := c.First(); k != nil; k, v = c.Next() {
  177. jsonErr := json.Unmarshal(v, &m)
  178. if jsonErr != nil {
  179. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  180. continue
  181. }
  182. if m.Status == Disabled || len(m.Name) == 0 {
  183. err = c.Delete()
  184. }
  185. }
  186. return err
  187. })
  188. return
  189. }
  190. func (b *boltAdapter) Close() error {
  191. if b.db != nil {
  192. return b.db.Close()
  193. }
  194. return nil
  195. }