2
0

db.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package manager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/boltdb/bolt"
  8. "github.com/go-redis/redis/v8"
  9. . "github.com/tuna/tunasync/internal"
  10. )
  11. type dbAdapter interface {
  12. Init() error
  13. ListWorkers() ([]WorkerStatus, error)
  14. GetWorker(workerID string) (WorkerStatus, error)
  15. DeleteWorker(workerID string) error
  16. CreateWorker(w WorkerStatus) (WorkerStatus, error)
  17. RefreshWorker(workerID string) (WorkerStatus, error)
  18. UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
  19. GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
  20. ListMirrorStatus(workerID string) ([]MirrorStatus, error)
  21. ListAllMirrorStatus() ([]MirrorStatus, error)
  22. FlushDisabledJobs() error
  23. Close() error
  24. }
  25. // interface for a kv database
  26. type kvAdapter interface {
  27. InitBucket(bucket string) error
  28. Get(bucket string, key string) ([]byte, error)
  29. GetAll(bucket string) (map[string][]byte, error)
  30. Put(bucket string, key string, value []byte) error
  31. Delete(bucket string, key string) error
  32. Close() error
  33. }
  34. const (
  35. _workerBucketKey = "workers"
  36. _statusBucketKey = "mirror_status"
  37. )
  38. func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
  39. if dbType == "bolt" {
  40. innerDB, err := bolt.Open(dbFile, 0600, &bolt.Options{
  41. Timeout: 5 * time.Second,
  42. })
  43. if err != nil {
  44. return nil, err
  45. }
  46. db := boltAdapter{
  47. db: innerDB,
  48. }
  49. kv := kvDBAdapter{
  50. db: &db,
  51. }
  52. err = kv.Init()
  53. return &kv, err
  54. } else if dbType == "redis" {
  55. opt, err := redis.ParseURL(dbFile)
  56. if err != nil {
  57. return nil, fmt.Errorf("bad redis url: %s", err)
  58. }
  59. innerDB := redis.NewClient(opt)
  60. db := redisAdapter{
  61. db: innerDB,
  62. }
  63. kv := kvDBAdapter{
  64. db: &db,
  65. }
  66. err = kv.Init()
  67. return &kv, err
  68. }
  69. // unsupported db-type
  70. return nil, fmt.Errorf("unsupported db-type: %s", dbType)
  71. }
  72. // use the underlying kv database to store data
  73. type kvDBAdapter struct {
  74. db kvAdapter
  75. }
  76. func (b *kvDBAdapter) Init() error {
  77. err := b.db.InitBucket(_workerBucketKey)
  78. if err != nil {
  79. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  80. }
  81. err = b.db.InitBucket(_statusBucketKey)
  82. if err != nil {
  83. return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error())
  84. }
  85. return err
  86. }
  87. func (b *kvDBAdapter) ListWorkers() (ws []WorkerStatus, err error) {
  88. var workers map[string][]byte
  89. workers, err = b.db.GetAll(_workerBucketKey)
  90. var w WorkerStatus
  91. for _, v := range workers {
  92. jsonErr := json.Unmarshal(v, &w)
  93. if jsonErr != nil {
  94. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  95. continue
  96. }
  97. ws = append(ws, w)
  98. }
  99. return
  100. }
  101. func (b *kvDBAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
  102. var v []byte
  103. v, err = b.db.Get(_workerBucketKey, workerID)
  104. if v == nil {
  105. err = fmt.Errorf("invalid workerID %s", workerID)
  106. } else {
  107. err = json.Unmarshal(v, &w)
  108. }
  109. return
  110. }
  111. func (b *kvDBAdapter) DeleteWorker(workerID string) error {
  112. v, _ := b.db.Get(_workerBucketKey, workerID)
  113. if v == nil {
  114. return fmt.Errorf("invalid workerID %s", workerID)
  115. }
  116. return b.db.Delete(_workerBucketKey, workerID)
  117. }
  118. func (b *kvDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  119. v, err := json.Marshal(w)
  120. if err == nil {
  121. err = b.db.Put(_workerBucketKey, w.ID, v)
  122. }
  123. return w, err
  124. }
  125. func (b *kvDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
  126. w, err = b.GetWorker(workerID)
  127. if err == nil {
  128. w.LastOnline = time.Now()
  129. w, err = b.CreateWorker(w)
  130. }
  131. return w, err
  132. }
  133. func (b *kvDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  134. id := mirrorID + "/" + workerID
  135. v, err := json.Marshal(status)
  136. if err == nil {
  137. err = b.db.Put(_statusBucketKey, id, v)
  138. }
  139. return status, err
  140. }
  141. func (b *kvDBAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
  142. id := mirrorID + "/" + workerID
  143. var v []byte
  144. v, err = b.db.Get(_statusBucketKey, id)
  145. if v == nil {
  146. err = fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
  147. } else if err == nil {
  148. err = json.Unmarshal(v, &m)
  149. }
  150. return
  151. }
  152. func (b *kvDBAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
  153. var vals map[string][]byte
  154. vals, err = b.db.GetAll(_statusBucketKey)
  155. if err != nil {
  156. return
  157. }
  158. for k, v := range vals {
  159. if wID := strings.Split(k, "/")[1]; wID == workerID {
  160. var m MirrorStatus
  161. jsonErr := json.Unmarshal(v, &m)
  162. if jsonErr != nil {
  163. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  164. continue
  165. }
  166. ms = append(ms, m)
  167. }
  168. }
  169. return
  170. }
  171. func (b *kvDBAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
  172. var vals map[string][]byte
  173. vals, err = b.db.GetAll(_statusBucketKey)
  174. if err != nil {
  175. return
  176. }
  177. for _, v := range vals {
  178. var m MirrorStatus
  179. jsonErr := json.Unmarshal(v, &m)
  180. if jsonErr != nil {
  181. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  182. continue
  183. }
  184. ms = append(ms, m)
  185. }
  186. return
  187. }
  188. func (b *kvDBAdapter) FlushDisabledJobs() (err error) {
  189. var vals map[string][]byte
  190. vals, err = b.db.GetAll(_statusBucketKey)
  191. if err != nil {
  192. return
  193. }
  194. for k, v := range vals {
  195. var m MirrorStatus
  196. jsonErr := json.Unmarshal(v, &m)
  197. if jsonErr != nil {
  198. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  199. continue
  200. }
  201. if m.Status == Disabled || len(m.Name) == 0 {
  202. deleteErr := b.db.Delete(_statusBucketKey, k)
  203. if deleteErr != nil {
  204. err = fmt.Errorf("%s; %s", err.Error(), deleteErr)
  205. }
  206. }
  207. }
  208. return
  209. }
  210. func (b *kvDBAdapter) Close() error {
  211. if b.db != nil {
  212. return b.db.Close()
  213. }
  214. return nil
  215. }