db.go 6.0 KB


  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/boltdb/bolt"
  8. "github.com/dgraph-io/badger/v2"
  9. "github.com/go-redis/redis/v8"
  10. "github.com/pkg/errors"
  11. "github.com/syndtr/goleveldb/leveldb"
  12. . "github.com/tuna/tunasync/internal"
  13. )
  14. type dbAdapter interface {
  15. Init() error
  16. ListWorkers() ([]WorkerStatus, error)
  17. GetWorker(workerID string) (WorkerStatus, error)
  18. DeleteWorker(workerID string) error
  19. CreateWorker(w WorkerStatus) (WorkerStatus, error)
  20. RefreshWorker(workerID string) (WorkerStatus, error)
  21. UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
  22. GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
  23. ListMirrorStatus(workerID string) ([]MirrorStatus, error)
  24. ListAllMirrorStatus() ([]MirrorStatus, error)
  25. FlushDisabledJobs() error
  26. Close() error
  27. }
  28. // interface for a kv database
  29. type kvAdapter interface {
  30. InitBucket(bucket string) error
  31. Get(bucket string, key string) ([]byte, error)
  32. GetAll(bucket string) (map[string][]byte, error)
  33. Put(bucket string, key string, value []byte) error
  34. Delete(bucket string, key string) error
  35. Close() error
  36. }
  37. const (
  38. _workerBucketKey = "workers"
  39. _statusBucketKey = "mirror_status"
  40. )
  41. func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
  42. if dbType == "bolt" {
  43. innerDB, err := bolt.Open(dbFile, 0600, &bolt.Options{
  44. Timeout: 5 * time.Second,
  45. })
  46. if err != nil {
  47. return nil, err
  48. }
  49. db := boltAdapter{
  50. db: innerDB,
  51. }
  52. kv := kvDBAdapter{
  53. db: &db,
  54. }
  55. err = kv.Init()
  56. return &kv, err
  57. } else if dbType == "redis" {
  58. opt, err := redis.ParseURL(dbFile)
  59. if err != nil {
  60. return nil, fmt.Errorf("bad redis url: %s", err)
  61. }
  62. innerDB := redis.NewClient(opt)
  63. db := redisAdapter{
  64. db: innerDB,
  65. }
  66. kv := kvDBAdapter{
  67. db: &db,
  68. }
  69. err = kv.Init()
  70. return &kv, err
  71. } else if dbType == "badger" {
  72. innerDB, err := badger.Open(badger.DefaultOptions(dbFile))
  73. if err != nil {
  74. return nil, err
  75. }
  76. db := badgerAdapter{
  77. db: innerDB,
  78. }
  79. kv := kvDBAdapter{
  80. db: &db,
  81. }
  82. err = kv.Init()
  83. return &kv, err
  84. } else if dbType == "leveldb" {
  85. innerDB, err := leveldb.OpenFile(dbFile, nil)
  86. if err != nil {
  87. return nil, err
  88. }
  89. db := leveldbAdapter{
  90. db: innerDB,
  91. }
  92. kv := kvDBAdapter{
  93. db: &db,
  94. }
  95. err = kv.Init()
  96. return &kv, err
  97. }
  98. // unsupported db-type
  99. return nil, fmt.Errorf("unsupported db-type: %s", dbType)
  100. }
  101. // use the underlying kv database to store data
  102. type kvDBAdapter struct {
  103. db kvAdapter
  104. }
  105. func (b *kvDBAdapter) Init() error {
  106. err := b.db.InitBucket(_workerBucketKey)
  107. if err != nil {
  108. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  109. }
  110. err = b.db.InitBucket(_statusBucketKey)
  111. if err != nil {
  112. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  113. }
  114. return err
  115. }
  116. func (b *kvDBAdapter) ListWorkers() (ws []WorkerStatus, err error) {
  117. var workers map[string][]byte
  118. workers, err = b.db.GetAll(_workerBucketKey)
  119. var w WorkerStatus
  120. for _, v := range workers {
  121. jsonErr := json.Unmarshal(v, &w)
  122. if jsonErr != nil {
  123. err = errors.Wrap(err, jsonErr.Error())
  124. continue
  125. }
  126. ws = append(ws, w)
  127. }
  128. return
  129. }
  130. func (b *kvDBAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
  131. var v []byte
  132. v, err = b.db.Get(_workerBucketKey, workerID)
  133. if v == nil {
  134. err = fmt.Errorf("invalid workerID %s", workerID)
  135. } else {
  136. err = json.Unmarshal(v, &w)
  137. }
  138. return
  139. }
  140. func (b *kvDBAdapter) DeleteWorker(workerID string) error {
  141. v, _ := b.db.Get(_workerBucketKey, workerID)
  142. if v == nil {
  143. return fmt.Errorf("invalid workerID %s", workerID)
  144. }
  145. return b.db.Delete(_workerBucketKey, workerID)
  146. }
  147. func (b *kvDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  148. v, err := json.Marshal(w)
  149. if err == nil {
  150. err = b.db.Put(_workerBucketKey, w.ID, v)
  151. }
  152. return w, err
  153. }
  154. func (b *kvDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
  155. w, err = b.GetWorker(workerID)
  156. if err == nil {
  157. w.LastOnline = time.Now()
  158. w, err = b.CreateWorker(w)
  159. }
  160. return w, err
  161. }
  162. func (b *kvDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  163. id := mirrorID + "/" + workerID
  164. v, err := json.Marshal(status)
  165. if err == nil {
  166. err = b.db.Put(_statusBucketKey, id, v)
  167. }
  168. return status, err
  169. }
  170. func (b *kvDBAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
  171. id := mirrorID + "/" + workerID
  172. var v []byte
  173. v, err = b.db.Get(_statusBucketKey, id)
  174. if v == nil {
  175. err = fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
  176. } else if err == nil {
  177. err = json.Unmarshal(v, &m)
  178. }
  179. return
  180. }
  181. func (b *kvDBAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
  182. var vals map[string][]byte
  183. vals, err = b.db.GetAll(_statusBucketKey)
  184. if err != nil {
  185. return
  186. }
  187. for k, v := range vals {
  188. if wID := strings.Split(k, "/")[1]; wID == workerID {
  189. var m MirrorStatus
  190. jsonErr := json.Unmarshal(v, &m)
  191. if jsonErr != nil {
  192. err = errors.Wrap(err, jsonErr.Error())
  193. continue
  194. }
  195. ms = append(ms, m)
  196. }
  197. }
  198. return
  199. }
  200. func (b *kvDBAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
  201. var vals map[string][]byte
  202. vals, err = b.db.GetAll(_statusBucketKey)
  203. if err != nil {
  204. return
  205. }
  206. for _, v := range vals {
  207. var m MirrorStatus
  208. jsonErr := json.Unmarshal(v, &m)
  209. if jsonErr != nil {
  210. err = errors.Wrap(err, jsonErr.Error())
  211. continue
  212. }
  213. ms = append(ms, m)
  214. }
  215. return
  216. }
  217. func (b *kvDBAdapter) FlushDisabledJobs() (err error) {
  218. var vals map[string][]byte
  219. vals, err = b.db.GetAll(_statusBucketKey)
  220. if err != nil {
  221. return
  222. }
  223. for k, v := range vals {
  224. var m MirrorStatus
  225. jsonErr := json.Unmarshal(v, &m)
  226. if jsonErr != nil {
  227. err = errors.Wrap(err, jsonErr.Error())
  228. continue
  229. }
  230. if m.Status == Disabled || len(m.Name) == 0 {
  231. deleteErr := b.db.Delete(_statusBucketKey, k)
  232. if deleteErr != nil {
  233. err = errors.Wrap(err, deleteErr.Error())
  234. }
  235. }
  236. }
  237. return
  238. }
  239. func (b *kvDBAdapter) Close() error {
  240. if b.db != nil {
  241. return b.db.Close()
  242. }
  243. return nil
  244. }