db.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/boltdb/bolt"
  8. "github.com/go-redis/redis/v8"
  9. . "github.com/tuna/tunasync/internal"
  10. )
  11. type dbAdapter interface {
  12. Init() error
  13. ListWorkers() ([]WorkerStatus, error)
  14. GetWorker(workerID string) (WorkerStatus, error)
  15. DeleteWorker(workerID string) error
  16. CreateWorker(w WorkerStatus) (WorkerStatus, error)
  17. RefreshWorker(workerID string) (WorkerStatus, error)
  18. UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
  19. GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
  20. ListMirrorStatus(workerID string) ([]MirrorStatus, error)
  21. ListAllMirrorStatus() ([]MirrorStatus, error)
  22. FlushDisabledJobs() error
  23. Close() error
  24. }
  25. type kvAdapter interface {
  26. InitBucket(bucket string) error
  27. Get(bucket string, key string) ([]byte, error)
  28. GetAll(bucket string) (map[string][]byte, error)
  29. Put(bucket string, key string, value []byte) error
  30. Delete(bucket string, key string) error
  31. Close() error
  32. }
  33. const (
  34. _workerBucketKey = "workers"
  35. _statusBucketKey = "mirror_status"
  36. )
  37. func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
  38. if dbType == "bolt" {
  39. innerDB, err := bolt.Open(dbFile, 0600, &bolt.Options{
  40. Timeout: 5 * time.Second,
  41. })
  42. if err != nil {
  43. return nil, err
  44. }
  45. db := boltAdapter{
  46. db: innerDB,
  47. dbFile: dbFile,
  48. }
  49. kv := kvDBAdapter{
  50. db: &db,
  51. }
  52. err = kv.Init()
  53. return &kv, err
  54. } else if dbType == "redis" {
  55. opt, err := redis.ParseURL(dbFile)
  56. if err != nil {
  57. return nil, fmt.Errorf("bad redis url: %s", err)
  58. }
  59. innerDB := redis.NewClient(opt)
  60. db := redisAdapter{
  61. db: innerDB,
  62. }
  63. kv := kvDBAdapter{
  64. db: &db,
  65. }
  66. err = kv.Init()
  67. return &kv, err
  68. }
  69. // unsupported db-type
  70. return nil, fmt.Errorf("unsupported db-type: %s", dbType)
  71. }
  72. type kvDBAdapter struct {
  73. db kvAdapter
  74. }
  75. func (b *kvDBAdapter) Init() error {
  76. err := b.db.InitBucket(_workerBucketKey)
  77. if err != nil {
  78. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  79. }
  80. err = b.db.InitBucket(_statusBucketKey)
  81. if err != nil {
  82. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  83. }
  84. return err
  85. }
  86. func (b *kvDBAdapter) ListWorkers() (ws []WorkerStatus, err error) {
  87. var workers map[string][]byte
  88. workers, err = b.db.GetAll(_workerBucketKey)
  89. var w WorkerStatus
  90. for _, v := range workers {
  91. jsonErr := json.Unmarshal(v, &w)
  92. if jsonErr != nil {
  93. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  94. continue
  95. }
  96. ws = append(ws, w)
  97. }
  98. return
  99. }
  100. func (b *kvDBAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
  101. var v []byte
  102. v, err = b.db.Get(_workerBucketKey, workerID)
  103. if v == nil {
  104. err = fmt.Errorf("invalid workerID %s", workerID)
  105. } else {
  106. err = json.Unmarshal(v, &w)
  107. }
  108. return
  109. }
  110. func (b *kvDBAdapter) DeleteWorker(workerID string) error {
  111. v, _ := b.db.Get(_workerBucketKey, workerID)
  112. if v == nil {
  113. return fmt.Errorf("invalid workerID %s", workerID)
  114. }
  115. return b.db.Delete(_workerBucketKey, workerID)
  116. }
  117. func (b *kvDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  118. v, err := json.Marshal(w)
  119. if err == nil {
  120. err = b.db.Put(_workerBucketKey, w.ID, v)
  121. }
  122. return w, err
  123. }
  124. func (b *kvDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
  125. w, err = b.GetWorker(workerID)
  126. if err == nil {
  127. w.LastOnline = time.Now()
  128. w, err = b.CreateWorker(w)
  129. }
  130. return w, err
  131. }
  132. func (b *kvDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  133. id := mirrorID + "/" + workerID
  134. v, err := json.Marshal(status)
  135. if err == nil {
  136. err = b.db.Put(_statusBucketKey, id, v)
  137. }
  138. return status, err
  139. }
  140. func (b *kvDBAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
  141. id := mirrorID + "/" + workerID
  142. var v []byte
  143. v, err = b.db.Get(_statusBucketKey, id)
  144. if v == nil {
  145. err = fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
  146. } else if err == nil {
  147. err = json.Unmarshal(v, &m)
  148. }
  149. return
  150. }
  151. func (b *kvDBAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
  152. var vals map[string][]byte
  153. vals, err = b.db.GetAll(_statusBucketKey)
  154. if err != nil {
  155. return
  156. }
  157. for k, v := range vals {
  158. if wID := strings.Split(k, "/")[1]; wID == workerID {
  159. var m MirrorStatus
  160. jsonErr := json.Unmarshal(v, &m)
  161. if jsonErr != nil {
  162. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  163. continue
  164. }
  165. ms = append(ms, m)
  166. }
  167. }
  168. return
  169. }
  170. func (b *kvDBAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
  171. var vals map[string][]byte
  172. vals, err = b.db.GetAll(_statusBucketKey)
  173. if err != nil {
  174. return
  175. }
  176. for _, v := range vals {
  177. var m MirrorStatus
  178. jsonErr := json.Unmarshal(v, &m)
  179. if jsonErr != nil {
  180. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  181. continue
  182. }
  183. ms = append(ms, m)
  184. }
  185. return
  186. }
  187. func (b *kvDBAdapter) FlushDisabledJobs() (err error) {
  188. var vals map[string][]byte
  189. vals, err = b.db.GetAll(_statusBucketKey)
  190. if err != nil {
  191. return
  192. }
  193. for k, v := range vals {
  194. var m MirrorStatus
  195. jsonErr := json.Unmarshal(v, &m)
  196. if jsonErr != nil {
  197. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  198. continue
  199. }
  200. if m.Status == Disabled || len(m.Name) == 0 {
  201. deleteErr := b.db.Delete(_statusBucketKey, k)
  202. if deleteErr != nil {
  203. err = fmt.Errorf("%s; %s", err.Error(), deleteErr)
  204. }
  205. }
  206. }
  207. return
  208. }
  209. func (b *kvDBAdapter) Close() error {
  210. if b.db != nil {
  211. return b.db.Close()
  212. }
  213. return nil
  214. }