server.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package manager
  2. import (
  3. "fmt"
  4. "net/http"
  5. "github.com/gin-gonic/gin"
  6. . "github.com/tuna/tunasync/internal"
  7. )
  8. const (
  9. _errorKey = "error"
  10. _infoKey = "message"
  11. )
  12. type managerServer struct {
  13. *gin.Engine
  14. adapter dbAdapter
  15. }
  16. // listAllJobs repond with all jobs of specified workers
  17. func (s *managerServer) listAllJobs(c *gin.Context) {
  18. mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
  19. if err != nil {
  20. err := fmt.Errorf("failed to list all mirror status: %s",
  21. err.Error(),
  22. )
  23. c.Error(err)
  24. s.returnErrJSON(c, http.StatusInternalServerError, err)
  25. return
  26. }
  27. webMirStatusList := []webMirrorStatus{}
  28. for _, m := range mirrorStatusList {
  29. webMirStatusList = append(
  30. webMirStatusList,
  31. convertMirrorStatus(m),
  32. )
  33. }
  34. c.JSON(http.StatusOK, webMirStatusList)
  35. }
  36. // listWrokers respond with informations of all the workers
  37. func (s *managerServer) listWorkers(c *gin.Context) {
  38. var workerInfos []WorkerStatus
  39. workers, err := s.adapter.ListWorkers()
  40. if err != nil {
  41. err := fmt.Errorf("failed to list workers: %s",
  42. err.Error(),
  43. )
  44. c.Error(err)
  45. s.returnErrJSON(c, http.StatusInternalServerError, err)
  46. return
  47. }
  48. for _, w := range workers {
  49. workerInfos = append(workerInfos,
  50. WorkerStatus{
  51. ID: w.ID,
  52. LastOnline: w.LastOnline,
  53. })
  54. }
  55. c.JSON(http.StatusOK, workerInfos)
  56. }
  57. // registerWorker register an newly-online worker
  58. func (s *managerServer) registerWorker(c *gin.Context) {
  59. var _worker WorkerStatus
  60. c.BindJSON(&_worker)
  61. newWorker, err := s.adapter.CreateWorker(_worker)
  62. if err != nil {
  63. err := fmt.Errorf("failed to register worker: %s",
  64. err.Error(),
  65. )
  66. c.Error(err)
  67. s.returnErrJSON(c, http.StatusInternalServerError, err)
  68. return
  69. }
  70. // create workerCmd channel for this worker
  71. c.JSON(http.StatusOK, newWorker)
  72. }
  73. // listJobsOfWorker respond with all the jobs of the specified worker
  74. func (s *managerServer) listJobsOfWorker(c *gin.Context) {
  75. workerID := c.Param("id")
  76. mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
  77. if err != nil {
  78. err := fmt.Errorf("failed to list jobs of worker %s: %s",
  79. workerID, err.Error(),
  80. )
  81. c.Error(err)
  82. s.returnErrJSON(c, http.StatusInternalServerError, err)
  83. return
  84. }
  85. c.JSON(http.StatusOK, mirrorStatusList)
  86. }
  87. func (s *managerServer) returnErrJSON(c *gin.Context, code int, err error) {
  88. c.JSON(code, gin.H{
  89. _errorKey: err.Error(),
  90. })
  91. }
  92. func (s *managerServer) updateJobOfWorker(c *gin.Context) {
  93. workerID := c.Param("id")
  94. var status MirrorStatus
  95. c.BindJSON(&status)
  96. mirrorName := status.Name
  97. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  98. if err != nil {
  99. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  100. mirrorName, workerID, err.Error(),
  101. )
  102. c.Error(err)
  103. s.returnErrJSON(c, http.StatusInternalServerError, err)
  104. return
  105. }
  106. c.JSON(http.StatusOK, newStatus)
  107. }
  108. func (s *managerServer) handleClientCmd(c *gin.Context) {
  109. var clientCmd ClientCmd
  110. c.BindJSON(&clientCmd)
  111. workerID := clientCmd.WorkerID
  112. if workerID == "" {
  113. // TODO: decide which worker should do this mirror when WorkerID is null string
  114. logger.Error("handleClientCmd case workerID == \" \" not implemented yet")
  115. c.AbortWithStatus(http.StatusInternalServerError)
  116. return
  117. }
  118. w, err := s.adapter.GetWorker(workerID)
  119. if err != nil {
  120. err := fmt.Errorf("worker %s is not registered yet", workerID)
  121. s.returnErrJSON(c, http.StatusBadRequest, err)
  122. return
  123. }
  124. workerURL := w.URL
  125. // parse client cmd into worker cmd
  126. workerCmd := WorkerCmd{
  127. Cmd: clientCmd.Cmd,
  128. MirrorID: clientCmd.MirrorID,
  129. Args: clientCmd.Args,
  130. }
  131. // post command to worker
  132. _, err = postJSON(workerURL, workerCmd)
  133. if err != nil {
  134. err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error())
  135. c.Error(err)
  136. s.returnErrJSON(c, http.StatusInternalServerError, err)
  137. return
  138. }
  139. // TODO: check response for success
  140. c.JSON(http.StatusOK, gin.H{_infoKey: "successfully send command to worker " + workerID})
  141. }
  142. func (s *managerServer) setDBAdapter(adapter dbAdapter) {
  143. s.adapter = adapter
  144. }
  145. func makeHTTPServer(debug bool) *managerServer {
  146. // create gin engine
  147. if !debug {
  148. gin.SetMode(gin.ReleaseMode)
  149. }
  150. s := &managerServer{
  151. gin.Default(),
  152. nil,
  153. }
  154. // common log middleware
  155. s.Use(contextErrorLogger)
  156. s.GET("/ping", func(c *gin.Context) {
  157. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  158. })
  159. // list jobs, status page
  160. s.GET("/jobs", s.listAllJobs)
  161. // list workers
  162. s.GET("/workers", s.listWorkers)
  163. // worker online
  164. s.POST("/workers", s.registerWorker)
  165. // workerID should be valid in this route group
  166. workerValidateGroup := s.Group("/workers", s.workerIDValidator)
  167. // get job list
  168. workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
  169. // post job status
  170. workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
  171. // for tunasynctl to post commands
  172. s.POST("/cmd", s.handleClientCmd)
  173. return s
  174. }