2
0

schedule.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package worker
  2. // schedule queue for jobs
  3. import (
  4. "sync"
  5. "time"
  6. "github.com/ryszard/goskiplist/skiplist"
  7. )
  8. type scheduleQueue struct {
  9. sync.Mutex
  10. list *skiplist.SkipList
  11. jobs map[string]bool
  12. }
  13. type jobScheduleInfo struct {
  14. jobName string
  15. nextScheduled time.Time
  16. }
  17. func timeLessThan(l, r interface{}) bool {
  18. tl := l.(time.Time)
  19. tr := r.(time.Time)
  20. return tl.Before(tr)
  21. }
  22. func newScheduleQueue() *scheduleQueue {
  23. queue := new(scheduleQueue)
  24. queue.list = skiplist.NewCustomMap(timeLessThan)
  25. queue.jobs = make(map[string]bool)
  26. return queue
  27. }
  28. func (q *scheduleQueue) GetJobs() (jobs []jobScheduleInfo) {
  29. cur := q.list.Iterator()
  30. defer cur.Close()
  31. for cur.Next() {
  32. cj := cur.Value().(*mirrorJob)
  33. jobs = append(jobs, jobScheduleInfo{
  34. cj.Name(),
  35. cur.Key().(time.Time),
  36. })
  37. }
  38. return
  39. }
  40. func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) {
  41. q.Lock()
  42. defer q.Unlock()
  43. if _, ok := q.jobs[job.Name()]; ok {
  44. logger.Warningf("Job %s already scheduled, removing the existing one", job.Name())
  45. q.unsafeRemove(job.Name())
  46. }
  47. q.jobs[job.Name()] = true
  48. q.list.Set(schedTime, job)
  49. logger.Debugf("Added job %s @ %v", job.Name(), schedTime)
  50. }
  51. // pop out the first job if it's time to run it
  52. func (q *scheduleQueue) Pop() *mirrorJob {
  53. q.Lock()
  54. defer q.Unlock()
  55. first := q.list.SeekToFirst()
  56. if first == nil {
  57. return nil
  58. }
  59. defer first.Close()
  60. t := first.Key().(time.Time)
  61. if t.Before(time.Now()) {
  62. job := first.Value().(*mirrorJob)
  63. q.list.Delete(first.Key())
  64. delete(q.jobs, job.Name())
  65. logger.Debug("Popped out job %s @%v", job.Name(), t)
  66. return job
  67. }
  68. return nil
  69. }
  70. // remove job
  71. func (q *scheduleQueue) Remove(name string) bool {
  72. q.Lock()
  73. defer q.Unlock()
  74. return q.unsafeRemove(name)
  75. }
  76. // remove job
  77. func (q *scheduleQueue) unsafeRemove(name string) bool {
  78. cur := q.list.Iterator()
  79. defer cur.Close()
  80. for cur.Next() {
  81. cj := cur.Value().(*mirrorJob)
  82. if cj.Name() == name {
  83. q.list.Delete(cur.Key())
  84. delete(q.jobs, name)
  85. return true
  86. }
  87. }
  88. return false
  89. }