db.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/boltdb/bolt"
  8. . "github.com/tuna/tunasync/internal"
  9. )
  10. type dbAdapter interface {
  11. Init() error
  12. ListWorkers() ([]WorkerStatus, error)
  13. GetWorker(workerID string) (WorkerStatus, error)
  14. DeleteWorker(workerID string) error
  15. CreateWorker(w WorkerStatus) (WorkerStatus, error)
  16. RefreshWorker(workerID string) (WorkerStatus, error)
  17. UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
  18. GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
  19. ListMirrorStatus(workerID string) ([]MirrorStatus, error)
  20. ListAllMirrorStatus() ([]MirrorStatus, error)
  21. FlushDisabledJobs() error
  22. Close() error
  23. }
  24. func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
  25. if dbType == "bolt" {
  26. innerDB, err := bolt.Open(dbFile, 0600, &bolt.Options{
  27. Timeout: 5 * time.Second,
  28. })
  29. if err != nil {
  30. return nil, err
  31. }
  32. db := boltAdapter{
  33. db: innerDB,
  34. dbFile: dbFile,
  35. }
  36. err = db.Init()
  37. return &db, err
  38. }
  39. // unsupported db-type
  40. return nil, fmt.Errorf("unsupported db-type: %s", dbType)
  41. }
  42. const (
  43. _workerBucketKey = "workers"
  44. _statusBucketKey = "mirror_status"
  45. )
  46. type boltAdapter struct {
  47. db *bolt.DB
  48. dbFile string
  49. }
  50. func (b *boltAdapter) Init() (err error) {
  51. return b.db.Update(func(tx *bolt.Tx) error {
  52. _, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey))
  53. if err != nil {
  54. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  55. }
  56. _, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey))
  57. if err != nil {
  58. return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error())
  59. }
  60. return nil
  61. })
  62. }
  63. func (b *boltAdapter) ListWorkers() (ws []WorkerStatus, err error) {
  64. err = b.db.View(func(tx *bolt.Tx) error {
  65. bucket := tx.Bucket([]byte(_workerBucketKey))
  66. c := bucket.Cursor()
  67. var w WorkerStatus
  68. for k, v := c.First(); k != nil; k, v = c.Next() {
  69. jsonErr := json.Unmarshal(v, &w)
  70. if jsonErr != nil {
  71. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  72. continue
  73. }
  74. ws = append(ws, w)
  75. }
  76. return err
  77. })
  78. return
  79. }
  80. func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
  81. err = b.db.View(func(tx *bolt.Tx) error {
  82. bucket := tx.Bucket([]byte(_workerBucketKey))
  83. v := bucket.Get([]byte(workerID))
  84. if v == nil {
  85. return fmt.Errorf("invalid workerID %s", workerID)
  86. }
  87. err := json.Unmarshal(v, &w)
  88. return err
  89. })
  90. return
  91. }
  92. func (b *boltAdapter) DeleteWorker(workerID string) (err error) {
  93. err = b.db.Update(func(tx *bolt.Tx) error {
  94. bucket := tx.Bucket([]byte(_workerBucketKey))
  95. v := bucket.Get([]byte(workerID))
  96. if v == nil {
  97. return fmt.Errorf("invalid workerID %s", workerID)
  98. }
  99. err := bucket.Delete([]byte(workerID))
  100. return err
  101. })
  102. return
  103. }
  104. func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  105. err := b.db.Update(func(tx *bolt.Tx) error {
  106. bucket := tx.Bucket([]byte(_workerBucketKey))
  107. v, err := json.Marshal(w)
  108. if err != nil {
  109. return err
  110. }
  111. err = bucket.Put([]byte(w.ID), v)
  112. return err
  113. })
  114. return w, err
  115. }
  116. func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
  117. w, err = b.GetWorker(workerID)
  118. if err == nil {
  119. w.LastOnline = time.Now()
  120. w, err = b.CreateWorker(w)
  121. }
  122. return w, err
  123. }
  124. func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  125. id := mirrorID + "/" + workerID
  126. err := b.db.Update(func(tx *bolt.Tx) error {
  127. bucket := tx.Bucket([]byte(_statusBucketKey))
  128. v, err := json.Marshal(status)
  129. err = bucket.Put([]byte(id), v)
  130. return err
  131. })
  132. return status, err
  133. }
  134. func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
  135. id := mirrorID + "/" + workerID
  136. err = b.db.Update(func(tx *bolt.Tx) error {
  137. bucket := tx.Bucket([]byte(_statusBucketKey))
  138. v := bucket.Get([]byte(id))
  139. if v == nil {
  140. return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
  141. }
  142. err := json.Unmarshal(v, &m)
  143. return err
  144. })
  145. return
  146. }
  147. func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
  148. err = b.db.View(func(tx *bolt.Tx) error {
  149. bucket := tx.Bucket([]byte(_statusBucketKey))
  150. c := bucket.Cursor()
  151. var m MirrorStatus
  152. for k, v := c.First(); k != nil; k, v = c.Next() {
  153. if wID := strings.Split(string(k), "/")[1]; wID == workerID {
  154. jsonErr := json.Unmarshal(v, &m)
  155. if jsonErr != nil {
  156. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  157. continue
  158. }
  159. ms = append(ms, m)
  160. }
  161. }
  162. return err
  163. })
  164. return
  165. }
  166. func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
  167. err = b.db.View(func(tx *bolt.Tx) error {
  168. bucket := tx.Bucket([]byte(_statusBucketKey))
  169. c := bucket.Cursor()
  170. var m MirrorStatus
  171. for k, v := c.First(); k != nil; k, v = c.Next() {
  172. jsonErr := json.Unmarshal(v, &m)
  173. if jsonErr != nil {
  174. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  175. continue
  176. }
  177. ms = append(ms, m)
  178. }
  179. return err
  180. })
  181. return
  182. }
  183. func (b *boltAdapter) FlushDisabledJobs() (err error) {
  184. err = b.db.Update(func(tx *bolt.Tx) error {
  185. bucket := tx.Bucket([]byte(_statusBucketKey))
  186. c := bucket.Cursor()
  187. var m MirrorStatus
  188. for k, v := c.First(); k != nil; k, v = c.Next() {
  189. jsonErr := json.Unmarshal(v, &m)
  190. if jsonErr != nil {
  191. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  192. continue
  193. }
  194. if m.Status == Disabled || len(m.Name) == 0 {
  195. err = c.Delete()
  196. }
  197. }
  198. return err
  199. })
  200. return
  201. }
  202. func (b *boltAdapter) Close() error {
  203. if b.db != nil {
  204. return b.db.Close()
  205. }
  206. return nil
  207. }