2
0

db_bolt.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/boltdb/bolt"
  8. . "github.com/tuna/tunasync/internal"
  9. )
  10. type boltAdapter struct {
  11. db *bolt.DB
  12. dbFile string
  13. }
  14. func (b *boltAdapter) Init() (err error) {
  15. return b.db.Update(func(tx *bolt.Tx) error {
  16. _, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey))
  17. if err != nil {
  18. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  19. }
  20. _, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey))
  21. if err != nil {
  22. return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error())
  23. }
  24. return nil
  25. })
  26. }
  27. func (b *boltAdapter) ListWorkers() (ws []WorkerStatus, err error) {
  28. err = b.db.View(func(tx *bolt.Tx) error {
  29. bucket := tx.Bucket([]byte(_workerBucketKey))
  30. c := bucket.Cursor()
  31. var w WorkerStatus
  32. for k, v := c.First(); k != nil; k, v = c.Next() {
  33. jsonErr := json.Unmarshal(v, &w)
  34. if jsonErr != nil {
  35. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  36. continue
  37. }
  38. ws = append(ws, w)
  39. }
  40. return err
  41. })
  42. return
  43. }
  44. func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
  45. err = b.db.View(func(tx *bolt.Tx) error {
  46. bucket := tx.Bucket([]byte(_workerBucketKey))
  47. v := bucket.Get([]byte(workerID))
  48. if v == nil {
  49. return fmt.Errorf("invalid workerID %s", workerID)
  50. }
  51. err := json.Unmarshal(v, &w)
  52. return err
  53. })
  54. return
  55. }
  56. func (b *boltAdapter) DeleteWorker(workerID string) (err error) {
  57. err = b.db.Update(func(tx *bolt.Tx) error {
  58. bucket := tx.Bucket([]byte(_workerBucketKey))
  59. v := bucket.Get([]byte(workerID))
  60. if v == nil {
  61. return fmt.Errorf("invalid workerID %s", workerID)
  62. }
  63. err := bucket.Delete([]byte(workerID))
  64. return err
  65. })
  66. return
  67. }
  68. func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  69. err := b.db.Update(func(tx *bolt.Tx) error {
  70. bucket := tx.Bucket([]byte(_workerBucketKey))
  71. v, err := json.Marshal(w)
  72. if err != nil {
  73. return err
  74. }
  75. err = bucket.Put([]byte(w.ID), v)
  76. return err
  77. })
  78. return w, err
  79. }
  80. func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
  81. w, err = b.GetWorker(workerID)
  82. if err == nil {
  83. w.LastOnline = time.Now()
  84. w, err = b.CreateWorker(w)
  85. }
  86. return w, err
  87. }
  88. func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  89. id := mirrorID + "/" + workerID
  90. err := b.db.Update(func(tx *bolt.Tx) error {
  91. bucket := tx.Bucket([]byte(_statusBucketKey))
  92. v, err := json.Marshal(status)
  93. err = bucket.Put([]byte(id), v)
  94. return err
  95. })
  96. return status, err
  97. }
  98. func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
  99. id := mirrorID + "/" + workerID
  100. err = b.db.Update(func(tx *bolt.Tx) error {
  101. bucket := tx.Bucket([]byte(_statusBucketKey))
  102. v := bucket.Get([]byte(id))
  103. if v == nil {
  104. return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
  105. }
  106. err := json.Unmarshal(v, &m)
  107. return err
  108. })
  109. return
  110. }
  111. func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
  112. err = b.db.View(func(tx *bolt.Tx) error {
  113. bucket := tx.Bucket([]byte(_statusBucketKey))
  114. c := bucket.Cursor()
  115. var m MirrorStatus
  116. for k, v := c.First(); k != nil; k, v = c.Next() {
  117. if wID := strings.Split(string(k), "/")[1]; wID == workerID {
  118. jsonErr := json.Unmarshal(v, &m)
  119. if jsonErr != nil {
  120. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  121. continue
  122. }
  123. ms = append(ms, m)
  124. }
  125. }
  126. return err
  127. })
  128. return
  129. }
  130. func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
  131. err = b.db.View(func(tx *bolt.Tx) error {
  132. bucket := tx.Bucket([]byte(_statusBucketKey))
  133. c := bucket.Cursor()
  134. var m MirrorStatus
  135. for k, v := c.First(); k != nil; k, v = c.Next() {
  136. jsonErr := json.Unmarshal(v, &m)
  137. if jsonErr != nil {
  138. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  139. continue
  140. }
  141. ms = append(ms, m)
  142. }
  143. return err
  144. })
  145. return
  146. }
  147. func (b *boltAdapter) FlushDisabledJobs() (err error) {
  148. err = b.db.Update(func(tx *bolt.Tx) error {
  149. bucket := tx.Bucket([]byte(_statusBucketKey))
  150. c := bucket.Cursor()
  151. var m MirrorStatus
  152. for k, v := c.First(); k != nil; k, v = c.Next() {
  153. jsonErr := json.Unmarshal(v, &m)
  154. if jsonErr != nil {
  155. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  156. continue
  157. }
  158. if m.Status == Disabled || len(m.Name) == 0 {
  159. err = c.Delete()
  160. }
  161. }
  162. return err
  163. })
  164. return
  165. }
  166. func (b *boltAdapter) Close() error {
  167. if b.db != nil {
  168. return b.db.Close()
  169. }
  170. return nil
  171. }