db_redis.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package manager
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/go-redis/redis/v8"
  9. . "github.com/tuna/tunasync/internal"
  10. )
  11. type redisAdapter struct {
  12. db *redis.Client
  13. }
  14. var ctx = context.Background()
  15. func (b *redisAdapter) Init() (err error) {
  16. return nil
  17. }
  18. func (b *redisAdapter) ListWorkers() (ws []WorkerStatus, err error) {
  19. var val map[string]string
  20. val, err = b.db.HGetAll(ctx, _workerBucketKey).Result()
  21. if err == nil {
  22. var w WorkerStatus
  23. for _, v := range val {
  24. jsonErr := json.Unmarshal([]byte(v), &w)
  25. if jsonErr != nil {
  26. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  27. continue
  28. }
  29. ws = append(ws, w)
  30. }
  31. }
  32. return
  33. }
  34. func (b *redisAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
  35. var val string
  36. val, err = b.db.HGet(ctx, _workerBucketKey, workerID).Result()
  37. if err == nil {
  38. err = json.Unmarshal([]byte(val), &w)
  39. } else {
  40. err = fmt.Errorf("invalid workerID %s", workerID)
  41. }
  42. return
  43. }
  44. func (b *redisAdapter) DeleteWorker(workerID string) (err error) {
  45. _, err = b.db.HDel(ctx, _workerBucketKey, workerID).Result()
  46. if err != nil {
  47. err = fmt.Errorf("invalid workerID %s", workerID)
  48. }
  49. return
  50. }
  51. func (b *redisAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
  52. var v []byte
  53. v, err := json.Marshal(w)
  54. if err == nil {
  55. _, err = b.db.HSet(ctx, _workerBucketKey, w.ID, string(v)).Result()
  56. }
  57. return w, err
  58. }
  59. func (b *redisAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
  60. w, err = b.GetWorker(workerID)
  61. if err == nil {
  62. w.LastOnline = time.Now()
  63. w, err = b.CreateWorker(w)
  64. }
  65. return w, err
  66. }
  67. func (b *redisAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
  68. id := mirrorID + "/" + workerID
  69. v, err := json.Marshal(status)
  70. if err == nil {
  71. _, err = b.db.HSet(ctx, _statusBucketKey, id, string(v)).Result()
  72. }
  73. return status, err
  74. }
  75. func (b *redisAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) {
  76. id := mirrorID + "/" + workerID
  77. var val string
  78. val, err = b.db.HGet(ctx, _statusBucketKey, id).Result()
  79. if err == nil {
  80. err = json.Unmarshal([]byte(val), &m)
  81. } else {
  82. err = fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
  83. }
  84. return
  85. }
  86. func (b *redisAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) {
  87. var val map[string]string
  88. val, err = b.db.HGetAll(ctx, _statusBucketKey).Result()
  89. if err == nil {
  90. var m MirrorStatus
  91. for k, v := range val {
  92. if wID := strings.Split(string(k), "/")[1]; wID == workerID {
  93. jsonErr := json.Unmarshal([]byte(v), &m)
  94. if jsonErr != nil {
  95. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  96. continue
  97. }
  98. ms = append(ms, m)
  99. }
  100. }
  101. }
  102. return
  103. }
  104. func (b *redisAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) {
  105. var val map[string]string
  106. val, err = b.db.HGetAll(ctx, _statusBucketKey).Result()
  107. if err == nil {
  108. var m MirrorStatus
  109. for _, v := range val {
  110. jsonErr := json.Unmarshal([]byte(v), &m)
  111. if jsonErr != nil {
  112. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  113. continue
  114. }
  115. ms = append(ms, m)
  116. }
  117. }
  118. return
  119. }
  120. func (b *redisAdapter) FlushDisabledJobs() (err error) {
  121. var val map[string]string
  122. val, err = b.db.HGetAll(ctx, _statusBucketKey).Result()
  123. if err == nil {
  124. var m MirrorStatus
  125. for k, v := range val {
  126. jsonErr := json.Unmarshal([]byte(v), &m)
  127. if jsonErr != nil {
  128. err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
  129. continue
  130. }
  131. if m.Status == Disabled || len(m.Name) == 0 {
  132. _, err = b.db.HDel(ctx, _statusBucketKey, k).Result()
  133. }
  134. }
  135. }
  136. return
  137. }
  138. func (b *redisAdapter) Close() error {
  139. if b.db != nil {
  140. return b.db.Close()
  141. }
  142. return nil
  143. }