2
0

schedule.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package worker
  2. // schedule queue for jobs
  3. import (
  4. "sync"
  5. "time"
  6. "github.com/ryszard/goskiplist/skiplist"
  7. )
  8. type scheduleQueue struct {
  9. sync.Mutex
  10. list *skiplist.SkipList
  11. }
  12. func timeLessThan(l, r interface{}) bool {
  13. tl := l.(time.Time)
  14. tr := r.(time.Time)
  15. return tl.Before(tr)
  16. }
  17. func newScheduleQueue() *scheduleQueue {
  18. queue := new(scheduleQueue)
  19. queue.list = skiplist.NewCustomMap(timeLessThan)
  20. return queue
  21. }
  22. func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) {
  23. q.Lock()
  24. defer q.Unlock()
  25. q.list.Set(schedTime, job)
  26. }
  27. // pop out the first job if it's time to run it
  28. func (q *scheduleQueue) Pop() *mirrorJob {
  29. q.Lock()
  30. defer q.Unlock()
  31. first := q.list.SeekToFirst()
  32. if first == nil {
  33. return nil
  34. }
  35. defer first.Close()
  36. t := first.Key().(time.Time)
  37. // logger.Debug("First job should run @%v", t)
  38. if t.Before(time.Now()) {
  39. job := first.Value().(*mirrorJob)
  40. q.list.Delete(first.Key())
  41. return job
  42. }
  43. return nil
  44. }
  45. // remove job
  46. func (q *scheduleQueue) Remove(name string) bool {
  47. q.Lock()
  48. defer q.Unlock()
  49. cur := q.list.Iterator()
  50. defer cur.Close()
  51. for cur.Next() {
  52. cj := cur.Value().(*mirrorJob)
  53. if cj.Name() == name {
  54. q.list.Delete(cur.Key())
  55. return true
  56. }
  57. }
  58. return false
  59. }