db.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/boltdb/bolt"
  8. "github.com/go-redis/redis/v8"
  9. . "github.com/tuna/tunasync/internal"
  10. )
  11. type dbAdapter interface {
  12. Init() error
  13. ListWorkers() ([]WorkerStatus, error)
  14. GetWorker(workerID string) (WorkerStatus, error)
  15. DeleteWorker(workerID string) error
  16. CreateWorker(w WorkerStatus) (WorkerStatus, error)
  17. RefreshWorker(workerID string) (WorkerStatus, error)
  18. UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
  19. GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
  20. ListMirrorStatus(workerID string) ([]MirrorStatus, error)
  21. ListAllMirrorStatus() ([]MirrorStatus, error)
  22. FlushDisabledJobs() error
  23. Close() error
  24. }
  25. type kvAdapter interface {
  26. InitBucket(bucket string) error
  27. Get(bucket string, key string) ([]byte, error)
  28. GetAll(bucket string) (map[string][]byte, error)
  29. Put(bucket string, key string, value []byte) error
  30. Delete(bucket string, key string) error
  31. Close() error
  32. }
  33. const (
  34. _workerBucketKey = "workers"
  35. _statusBucketKey = "mirror_status"
  36. )
  37. func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
  38. if dbType == "bolt" {
  39. innerDB, err := bolt.Open(dbFile, 0600, &bolt.Options{
  40. Timeout: 5 * time.Second,
  41. })
  42. if err != nil {
  43. return nil, err
  44. }
  45. db := boltAdapter{
  46. db: innerDB,
  47. dbFile: dbFile,
  48. }
  49. kv := kvDBAdapter{
  50. db: &db,
  51. }
  52. err = kv.Init()
  53. return &kv, err
  54. } else if dbType == "redis" {
  55. opt, err := redis.ParseURL(dbFile)
  56. if err != nil {
  57. return nil, fmt.Errorf("bad redis url: %s", err)
  58. }
  59. innerDB := redis.NewClient(opt)
  60. db := redisAdapter{
  61. db: innerDB,
  62. }
  63. kv := kvDBAdapter{
  64. db: &db,
  65. }
  66. err = kv.Init()
  67. return &kv, err
  68. }
  69. // unsupported db-type
  70. return nil, fmt.Errorf("unsupported db-type: %s", dbType)
  71. }
  72. type kvDBAdapter struct {
  73. db kvAdapter
  74. }
  75. func (b *kvDBAdapter) Init() error {
  76. err := b.db.InitBucket(_workerBucketKey)
  77. if err != nil {
  78. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  79. }
  80. err = b.db.InitBucket(_statusBucketKey)
  81. if err != nil {
  82. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  83. }
  84. return err
  85. }
  86. func (b *kvDBAdapter) ListWorkers() (ws []WorkerStatus, err error) {
  87. var workers map[string][]byte
  88. workers, err = b.db.GetAll(_workerBucketKey)
  89. var w WorkerStatus
  90. for _, v := range workers {
  91. jsonErr := json.Unmarshal(v, &w)
  92. if jsonErr != nil {
  93. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  94. continue
  95. }
  96. ws = append(ws, w)
  97. }
  98. return
  99. }
  100. func (b *kvDBAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
  101. var v []byte
  102. v, err = b.db.Get(_workerBucketKey, workerID)
  103. if v == nil {
  104. err = fmt.Errorf("invalid workerID %s", workerID)
  105. } else {
  106. err = json.Unmarshal(v, &w)
  107. }
  108. return
  109. }
  110. func (b *kvDBAdapter) DeleteWorker(workerID string) error {
  111. return b.db.Delete(_workerBucketKey, workerID)
  112. }
  113. func (b *kvDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  114. v, err := json.Marshal(w)
  115. if err == nil {
  116. err = b.db.Put(_workerBucketKey, w.ID, v)
  117. }
  118. return w, err
  119. }
  120. func (b *kvDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
  121. w, err = b.GetWorker(workerID)
  122. if err == nil {
  123. w.LastOnline = time.Now()
  124. w, err = b.CreateWorker(w)
  125. }
  126. return w, err
  127. }
  128. func (b *kvDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  129. id := mirrorID + "/" + workerID
  130. v, err := json.Marshal(status)
  131. if err == nil {
  132. err = b.db.Put(_statusBucketKey, id, v)
  133. }
  134. return status, err
  135. }
  136. func (b *kvDBAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
  137. id := mirrorID + "/" + workerID
  138. var v []byte
  139. v, err = b.db.Get(_statusBucketKey, id)
  140. if v == nil {
  141. err = fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
  142. } else if err == nil {
  143. err = json.Unmarshal(v, &m)
  144. }
  145. return
  146. }
  147. func (b *kvDBAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
  148. var vals map[string][]byte
  149. vals, err = b.db.GetAll(_statusBucketKey)
  150. if err != nil {
  151. return
  152. }
  153. for k, v := range vals {
  154. if wID := strings.Split(k, "/")[1]; wID == workerID {
  155. var m MirrorStatus
  156. jsonErr := json.Unmarshal(v, &m)
  157. if jsonErr != nil {
  158. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  159. continue
  160. }
  161. ms = append(ms, m)
  162. }
  163. }
  164. return
  165. }
  166. func (b *kvDBAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
  167. var vals map[string][]byte
  168. vals, err = b.db.GetAll(_statusBucketKey)
  169. if err != nil {
  170. return
  171. }
  172. for _, v := range vals {
  173. var m MirrorStatus
  174. jsonErr := json.Unmarshal(v, &m)
  175. if jsonErr != nil {
  176. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  177. continue
  178. }
  179. ms = append(ms, m)
  180. }
  181. return
  182. }
  183. func (b *kvDBAdapter) FlushDisabledJobs() (err error) {
  184. var vals map[string][]byte
  185. vals, err = b.db.GetAll(_statusBucketKey)
  186. if err != nil {
  187. return
  188. }
  189. for k, v := range vals {
  190. var m MirrorStatus
  191. jsonErr := json.Unmarshal(v, &m)
  192. if jsonErr != nil {
  193. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  194. continue
  195. }
  196. if m.Status == Disabled || len(m.Name) == 0 {
  197. deleteErr := b.db.Delete(_statusBucketKey, k)
  198. if deleteErr != nil {
  199. err = fmt.Errorf("%s; %s", err.Error(), deleteErr)
  200. }
  201. }
  202. }
  203. return
  204. }
  205. func (b *kvDBAdapter) Close() error {
  206. if b.db != nil {
  207. return b.db.Close()
  208. }
  209. return nil
  210. }