2
0

schedule.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package worker
  2. // schedule queue for jobs
  3. import (
  4. "sync"
  5. "time"
  6. "github.com/ryszard/goskiplist/skiplist"
  7. )
  8. type scheduleQueue struct {
  9. sync.Mutex
  10. list *skiplist.SkipList
  11. jobs map[string]bool
  12. }
  13. func timeLessThan(l, r interface{}) bool {
  14. tl := l.(time.Time)
  15. tr := r.(time.Time)
  16. return tl.Before(tr)
  17. }
  18. func newScheduleQueue() *scheduleQueue {
  19. queue := new(scheduleQueue)
  20. queue.list = skiplist.NewCustomMap(timeLessThan)
  21. queue.jobs = make(map[string]bool)
  22. return queue
  23. }
  24. func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) {
  25. q.Lock()
  26. defer q.Unlock()
  27. if _, ok := q.jobs[job.Name()]; ok {
  28. logger.Warningf("Job %s already scheduled, removing the existing one", job.Name())
  29. q.unsafeRemove(job.Name())
  30. }
  31. q.jobs[job.Name()] = true
  32. q.list.Set(schedTime, job)
  33. logger.Debugf("Added job %s @ %v", job.Name(), schedTime)
  34. }
  35. // pop out the first job if it's time to run it
  36. func (q *scheduleQueue) Pop() *mirrorJob {
  37. q.Lock()
  38. defer q.Unlock()
  39. first := q.list.SeekToFirst()
  40. if first == nil {
  41. return nil
  42. }
  43. defer first.Close()
  44. t := first.Key().(time.Time)
  45. if t.Before(time.Now()) {
  46. job := first.Value().(*mirrorJob)
  47. q.list.Delete(first.Key())
  48. delete(q.jobs, job.Name())
  49. logger.Debug("Popped out job %s @%v", job.Name(), t)
  50. return job
  51. }
  52. return nil
  53. }
  54. // remove job
  55. func (q *scheduleQueue) Remove(name string) bool {
  56. q.Lock()
  57. defer q.Unlock()
  58. return q.unsafeRemove(name)
  59. }
  60. // remove job
  61. func (q *scheduleQueue) unsafeRemove(name string) bool {
  62. cur := q.list.Iterator()
  63. defer cur.Close()
  64. for cur.Next() {
  65. cj := cur.Value().(*mirrorJob)
  66. if cj.Name() == name {
  67. q.list.Delete(cur.Key())
  68. delete(q.jobs, name)
  69. return true
  70. }
  71. }
  72. return false
  73. }