Ver código fonte

report next scheduled sync time

zyx 6 anos atrás
pai
commit
ddc9efd155
5 arquivos alterados com 103 adições e 0 exclusões
  1. 10 0
      internal/msg.go
  2. 4 0
      internal/status_web.go
  3. 43 0
      manager/server.go
  4. 19 0
      worker/schedule.go
  5. 27 0
      worker/worker.go

+ 10 - 0
internal/msg.go

@@ -14,6 +14,7 @@ type MirrorStatus struct {
 	Status     SyncStatus `json:"status"`
 	LastUpdate time.Time  `json:"last_update"`
 	LastEnded  time.Time  `json:"last_ended"`
+	Scheduled  time.Time  `json:"next_schedule"`
 	Upstream   string     `json:"upstream"`
 	Size       string     `json:"size"`
 	ErrorMsg   string     `json:"error_msg"`
@@ -28,6 +29,15 @@ type WorkerStatus struct {
 	LastOnline time.Time `json:"last_online"` // last seen
 }
 
+type MirrorSchedules struct {
+	Schedules []MirrorSchedule `json:"schedules"`
+}
+
+type MirrorSchedule struct {
+	MirrorName   string    `json:"name"`
+	NextSchedule time.Time `json:"next_schedule"`
+}
+
 // A CmdVerb is an action to a job or worker
 type CmdVerb uint8
 

+ 4 - 0
internal/status_web.go

@@ -45,6 +45,8 @@ type WebMirrorStatus struct {
 	LastUpdateTs stampTime  `json:"last_update_ts"`
 	LastEnded    textTime   `json:"last_ended"`
 	LastEndedTs  stampTime  `json:"last_ended_ts"`
+	Scheduled    textTime   `json:"next_schedule"`
+	ScheduledTs  stampTime  `json:"next_schedule_ts"`
 	Upstream     string     `json:"upstream"`
 	Size         string     `json:"size"` // approximate size
 }
@@ -58,6 +60,8 @@ func BuildWebMirrorStatus(m MirrorStatus) WebMirrorStatus {
 		LastUpdateTs: stampTime{m.LastUpdate},
 		LastEnded:    textTime{m.LastEnded},
 		LastEndedTs:  stampTime{m.LastEnded},
+		Scheduled:    textTime{m.Scheduled},
+		ScheduledTs:  stampTime{m.Scheduled},
 		Upstream:     m.Upstream,
 		Size:         m.Size,
 	}

+ 43 - 0
manager/server.go

@@ -91,6 +91,7 @@ func GetTUNASyncManager(cfg *Config) *Manager {
 		// post job status
 		workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
 		workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize)
+		workerValidateGroup.POST(":id/schedules", s.updateSchedulesOfWorker)
 	}
 
 	// for tunasynctl to post commands
@@ -240,6 +241,48 @@ func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) {
 	})
 }
 
+func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
+	workerID := c.Param("id")
+	var schedules MirrorSchedules
+	c.BindJSON(&schedules)
+
+	for _, schedule := range schedules.Schedules {
+		mirrorName := schedule.MirrorName
+		if len(mirrorName) == 0 {
+			s.returnErrJSON(
+				c, http.StatusBadRequest,
+				errors.New("Mirror Name should not be empty"),
+			)
+		}
+
+		curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
+		if err != nil {
+			fmt.Errorf("failed to get job %s of worker %s: %s",
+				mirrorName, workerID, err.Error(),
+			)
+			continue
+		}
+
+		if curStatus.Scheduled == schedule.NextSchedule {
+			// no changes, skip update
+			continue
+		}
+
+		curStatus.Scheduled = schedule.NextSchedule
+		_, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
+		if err != nil {
+			err := fmt.Errorf("failed to update job %s of worker %s: %s",
+				mirrorName, workerID, err.Error(),
+			)
+			c.Error(err)
+			s.returnErrJSON(c, http.StatusInternalServerError, err)
+			return
+		}
+	}
+	type empty struct{}
+	c.JSON(http.StatusOK, empty{})
+}
+
 func (s *Manager) updateJobOfWorker(c *gin.Context) {
 	workerID := c.Param("id")
 	var status MirrorStatus

+ 19 - 0
worker/schedule.go

@@ -15,6 +15,11 @@ type scheduleQueue struct {
 	jobs map[string]bool
 }
 
+type jobScheduleInfo struct {
+	jobName       string
+	nextScheduled time.Time
+}
+
 func timeLessThan(l, r interface{}) bool {
 	tl := l.(time.Time)
 	tr := r.(time.Time)
@@ -28,6 +33,20 @@ func newScheduleQueue() *scheduleQueue {
 	return queue
 }
 
+func (q *scheduleQueue) GetJobs() (jobs []jobScheduleInfo) {
+	cur := q.list.Iterator()
+	defer cur.Close()
+
+	for cur.Next() {
+		cj := cur.Value().(*mirrorJob)
+		jobs = append(jobs, jobScheduleInfo{
+			cj.Name(),
+			cur.Key().(time.Time),
+		})
+	}
+	return
+}
+
 func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) {
 	q.Lock()
 	defer q.Unlock()

+ 27 - 0
worker/worker.go

@@ -304,6 +304,9 @@ func (w *Worker) runSchedule() {
 
 	w.L.Unlock()
 
+	schedInfo := w.schedule.GetJobs()
+	w.updateSchedInfo(schedInfo)
+
 	tick := time.Tick(5 * time.Second)
 	for {
 		select {
@@ -340,6 +343,9 @@ func (w *Worker) runSchedule() {
 				w.schedule.AddJob(schedTime, job)
 			}
 
+			schedInfo = w.schedule.GetJobs()
+			w.updateSchedInfo(schedInfo)
+
 		case <-tick:
 			// check schedule every 5 seconds
 			if job := w.schedule.Pop(); job != nil {
@@ -421,6 +427,27 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
 	}
 }
 
+func (w *Worker) updateSchedInfo(schedInfo []jobScheduleInfo) {
+	var s []MirrorSchedule
+	for _, sched := range schedInfo {
+		s = append(s, MirrorSchedule{
+			MirrorName:   sched.jobName,
+			NextSchedule: sched.nextScheduled,
+		})
+	}
+	msg := MirrorSchedules{Schedules: s}
+
+	for _, root := range w.cfg.Manager.APIBaseList() {
+		url := fmt.Sprintf(
+			"%s/workers/%s/schedules", root, w.Name(),
+		)
+		logger.Debugf("reporting on manager url: %s", url)
+		if _, err := PostJSON(url, msg, w.httpClient); err != nil {
+			logger.Errorf("Failed to upload schedules: %s", err.Error())
+		}
+	}
+}
+
 func (w *Worker) fetchJobStatus() []MirrorStatus {
 	var mirrorList []MirrorStatus
 	apiBase := w.cfg.Manager.APIBaseList()[0]