db.go 5.7 KB


  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/boltdb/bolt"
  8. "github.com/dgraph-io/badger/v2"
  9. "github.com/go-redis/redis/v8"
  10. . "github.com/tuna/tunasync/internal"
  11. )
  12. type dbAdapter interface {
  13. Init() error
  14. ListWorkers() ([]WorkerStatus, error)
  15. GetWorker(workerID string) (WorkerStatus, error)
  16. DeleteWorker(workerID string) error
  17. CreateWorker(w WorkerStatus) (WorkerStatus, error)
  18. RefreshWorker(workerID string) (WorkerStatus, error)
  19. UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
  20. GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
  21. ListMirrorStatus(workerID string) ([]MirrorStatus, error)
  22. ListAllMirrorStatus() ([]MirrorStatus, error)
  23. FlushDisabledJobs() error
  24. Close() error
  25. }
  26. // interface for a kv database
  27. type kvAdapter interface {
  28. InitBucket(bucket string) error
  29. Get(bucket string, key string) ([]byte, error)
  30. GetAll(bucket string) (map[string][]byte, error)
  31. Put(bucket string, key string, value []byte) error
  32. Delete(bucket string, key string) error
  33. Close() error
  34. }
  35. const (
  36. _workerBucketKey = "workers"
  37. _statusBucketKey = "mirror_status"
  38. )
  39. func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
  40. if dbType == "bolt" {
  41. innerDB, err := bolt.Open(dbFile, 0600, &bolt.Options{
  42. Timeout: 5 * time.Second,
  43. })
  44. if err != nil {
  45. return nil, err
  46. }
  47. db := boltAdapter{
  48. db: innerDB,
  49. }
  50. kv := kvDBAdapter{
  51. db: &db,
  52. }
  53. err = kv.Init()
  54. return &kv, err
  55. } else if dbType == "redis" {
  56. opt, err := redis.ParseURL(dbFile)
  57. if err != nil {
  58. return nil, fmt.Errorf("bad redis url: %s", err)
  59. }
  60. innerDB := redis.NewClient(opt)
  61. db := redisAdapter{
  62. db: innerDB,
  63. }
  64. kv := kvDBAdapter{
  65. db: &db,
  66. }
  67. err = kv.Init()
  68. return &kv, err
  69. } else if dbType == "badger" {
  70. innerDB, err := badger.Open(badger.DefaultOptions(dbFile))
  71. if err != nil {
  72. return nil, err
  73. }
  74. db := badgerAdapter{
  75. db: innerDB,
  76. }
  77. kv := kvDBAdapter{
  78. db: &db,
  79. }
  80. err = kv.Init()
  81. return &kv, err
  82. }
  83. // unsupported db-type
  84. return nil, fmt.Errorf("unsupported db-type: %s", dbType)
  85. }
  86. // use the underlying kv database to store data
  87. type kvDBAdapter struct {
  88. db kvAdapter
  89. }
  90. func (b *kvDBAdapter) Init() error {
  91. err := b.db.InitBucket(_workerBucketKey)
  92. if err != nil {
  93. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  94. }
  95. err = b.db.InitBucket(_statusBucketKey)
  96. if err != nil {
  97. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  98. }
  99. return err
  100. }
  101. func (b *kvDBAdapter) ListWorkers() (ws []WorkerStatus, err error) {
  102. var workers map[string][]byte
  103. workers, err = b.db.GetAll(_workerBucketKey)
  104. var w WorkerStatus
  105. for _, v := range workers {
  106. jsonErr := json.Unmarshal(v, &w)
  107. if jsonErr != nil {
  108. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  109. continue
  110. }
  111. ws = append(ws, w)
  112. }
  113. return
  114. }
  115. func (b *kvDBAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
  116. var v []byte
  117. v, err = b.db.Get(_workerBucketKey, workerID)
  118. if v == nil {
  119. err = fmt.Errorf("invalid workerID %s", workerID)
  120. } else {
  121. err = json.Unmarshal(v, &w)
  122. }
  123. return
  124. }
  125. func (b *kvDBAdapter) DeleteWorker(workerID string) error {
  126. v, _ := b.db.Get(_workerBucketKey, workerID)
  127. if v == nil {
  128. return fmt.Errorf("invalid workerID %s", workerID)
  129. }
  130. return b.db.Delete(_workerBucketKey, workerID)
  131. }
  132. func (b *kvDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  133. v, err := json.Marshal(w)
  134. if err == nil {
  135. err = b.db.Put(_workerBucketKey, w.ID, v)
  136. }
  137. return w, err
  138. }
  139. func (b *kvDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
  140. w, err = b.GetWorker(workerID)
  141. if err == nil {
  142. w.LastOnline = time.Now()
  143. w, err = b.CreateWorker(w)
  144. }
  145. return w, err
  146. }
  147. func (b *kvDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  148. id := mirrorID + "/" + workerID
  149. v, err := json.Marshal(status)
  150. if err == nil {
  151. err = b.db.Put(_statusBucketKey, id, v)
  152. }
  153. return status, err
  154. }
  155. func (b *kvDBAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
  156. id := mirrorID + "/" + workerID
  157. var v []byte
  158. v, err = b.db.Get(_statusBucketKey, id)
  159. if v == nil {
  160. err = fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
  161. } else if err == nil {
  162. err = json.Unmarshal(v, &m)
  163. }
  164. return
  165. }
  166. func (b *kvDBAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
  167. var vals map[string][]byte
  168. vals, err = b.db.GetAll(_statusBucketKey)
  169. if err != nil {
  170. return
  171. }
  172. for k, v := range vals {
  173. if wID := strings.Split(k, "/")[1]; wID == workerID {
  174. var m MirrorStatus
  175. jsonErr := json.Unmarshal(v, &m)
  176. if jsonErr != nil {
  177. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  178. continue
  179. }
  180. ms = append(ms, m)
  181. }
  182. }
  183. return
  184. }
  185. func (b *kvDBAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
  186. var vals map[string][]byte
  187. vals, err = b.db.GetAll(_statusBucketKey)
  188. if err != nil {
  189. return
  190. }
  191. for _, v := range vals {
  192. var m MirrorStatus
  193. jsonErr := json.Unmarshal(v, &m)
  194. if jsonErr != nil {
  195. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  196. continue
  197. }
  198. ms = append(ms, m)
  199. }
  200. return
  201. }
  202. func (b *kvDBAdapter) FlushDisabledJobs() (err error) {
  203. var vals map[string][]byte
  204. vals, err = b.db.GetAll(_statusBucketKey)
  205. if err != nil {
  206. return
  207. }
  208. for k, v := range vals {
  209. var m MirrorStatus
  210. jsonErr := json.Unmarshal(v, &m)
  211. if jsonErr != nil {
  212. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  213. continue
  214. }
  215. if m.Status == Disabled || len(m.Name) == 0 {
  216. deleteErr := b.db.Delete(_statusBucketKey, k)
  217. if deleteErr != nil {
  218. err = fmt.Errorf("%s; %s", err.Error(), deleteErr)
  219. }
  220. }
  221. }
  222. return
  223. }
  224. func (b *kvDBAdapter) Close() error {
  225. if b.db != nil {
  226. return b.db.Close()
  227. }
  228. return nil
  229. }