server.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package manager
  2. import (
  3. "fmt"
  4. "github.com/gin-gonic/gin"
  5. . "github.com/tuna/tunasync/internal"
  6. "net/http"
  7. "sync"
  8. "time"
  9. )
  10. const (
  11. maxQueuedCmdNum = 3
  12. cmdPollTime = 10 * time.Second
  13. )
  14. const (
  15. _errorKey = "error"
  16. _infoKey = "message"
  17. )
  18. type worker struct {
  19. ID string `json:"id"` // worker name
  20. Token string `json:"token"` // session token
  21. }
  22. var (
  23. workerChannelMu sync.RWMutex
  24. workerChannels = make(map[string]chan WorkerCmd)
  25. )
  26. type managerServer struct {
  27. *gin.Engine
  28. adapter dbAdapter
  29. }
  30. // listAllJobs repond with all jobs of specified workers
  31. func (s *managerServer) listAllJobs(c *gin.Context) {
  32. mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
  33. if err != nil {
  34. err := fmt.Errorf("failed to list all mirror status: %s",
  35. err.Error(),
  36. )
  37. c.Error(err)
  38. s.returnErrJSON(c, http.StatusInternalServerError, err)
  39. return
  40. }
  41. c.JSON(http.StatusOK, mirrorStatusList)
  42. }
  43. // listWrokers respond with informations of all the workers
  44. func (s *managerServer) listWorkers(c *gin.Context) {
  45. var workerInfos []WorkerInfoMsg
  46. workers, err := s.adapter.ListWorkers()
  47. if err != nil {
  48. err := fmt.Errorf("failed to list workers: %s",
  49. err.Error(),
  50. )
  51. c.Error(err)
  52. s.returnErrJSON(c, http.StatusInternalServerError, err)
  53. return
  54. }
  55. for _, w := range workers {
  56. workerInfos = append(workerInfos,
  57. WorkerInfoMsg{w.ID})
  58. }
  59. c.JSON(http.StatusOK, workerInfos)
  60. }
  61. // registerWorker register an newly-online worker
  62. func (s *managerServer) registerWorker(c *gin.Context) {
  63. var _worker worker
  64. c.BindJSON(&_worker)
  65. newWorker, err := s.adapter.CreateWorker(_worker)
  66. if err != nil {
  67. err := fmt.Errorf("failed to register worker: %s",
  68. err.Error(),
  69. )
  70. c.Error(err)
  71. s.returnErrJSON(c, http.StatusInternalServerError, err)
  72. return
  73. }
  74. // create workerCmd channel for this worker
  75. workerChannelMu.Lock()
  76. defer workerChannelMu.Unlock()
  77. workerChannels[_worker.ID] = make(chan WorkerCmd, maxQueuedCmdNum)
  78. c.JSON(http.StatusOK, newWorker)
  79. }
  80. // listJobsOfWorker respond with all the jobs of the specified worker
  81. func (s *managerServer) listJobsOfWorker(c *gin.Context) {
  82. workerID := c.Param("id")
  83. mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
  84. if err != nil {
  85. err := fmt.Errorf("failed to list jobs of worker %s: %s",
  86. workerID, err.Error(),
  87. )
  88. c.Error(err)
  89. s.returnErrJSON(c, http.StatusInternalServerError, err)
  90. return
  91. }
  92. c.JSON(http.StatusOK, mirrorStatusList)
  93. }
  94. func (s *managerServer) returnErrJSON(c *gin.Context, code int, err error) {
  95. c.JSON(code, gin.H{
  96. _errorKey: err.Error(),
  97. })
  98. }
  99. func (s *managerServer) updateJobOfWorker(c *gin.Context) {
  100. workerID := c.Param("id")
  101. var status mirrorStatus
  102. c.BindJSON(&status)
  103. mirrorName := status.Name
  104. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  105. if err != nil {
  106. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  107. mirrorName, workerID, err.Error(),
  108. )
  109. c.Error(err)
  110. s.returnErrJSON(c, http.StatusInternalServerError, err)
  111. return
  112. }
  113. c.JSON(http.StatusOK, newStatus)
  114. }
  115. func (s *managerServer) handleClientCmd(c *gin.Context) {
  116. workerChannelMu.RLock()
  117. defer workerChannelMu.RUnlock()
  118. var clientCmd ClientCmd
  119. c.BindJSON(&clientCmd)
  120. // TODO: decide which worker should do this mirror when WorkerID is null string
  121. workerID := clientCmd.WorkerID
  122. if workerID == "" {
  123. // TODO: decide which worker should do this mirror when WorkerID is null string
  124. logger.Error("handleClientCmd case workerID == \" \" not implemented yet")
  125. c.AbortWithStatus(http.StatusInternalServerError)
  126. return
  127. }
  128. workerChannel, ok := workerChannels[workerID]
  129. if !ok {
  130. err := fmt.Errorf("worker %s is not registered yet", workerID)
  131. s.returnErrJSON(c, http.StatusBadRequest, err)
  132. return
  133. }
  134. // parse client cmd into worker cmd
  135. workerCmd := WorkerCmd{
  136. Cmd: clientCmd.Cmd,
  137. MirrorID: clientCmd.MirrorID,
  138. Args: clientCmd.Args,
  139. }
  140. select {
  141. case workerChannel <- workerCmd:
  142. // successfully insert command to channel
  143. c.JSON(http.StatusOK, struct{}{})
  144. default:
  145. // pending commands for that worker exceed
  146. // the maxQueuedCmdNum threshold
  147. err := fmt.Errorf("pending commands for worker %s exceed"+
  148. "the %d threshold, the command is dropped",
  149. workerID, maxQueuedCmdNum)
  150. c.Error(err)
  151. s.returnErrJSON(c, http.StatusServiceUnavailable, err)
  152. return
  153. }
  154. }
  155. func (s *managerServer) getCmdOfWorker(c *gin.Context) {
  156. workerID := c.Param("id")
  157. workerChannelMu.RLock()
  158. defer workerChannelMu.RUnlock()
  159. workerChannel := workerChannels[workerID]
  160. for {
  161. select {
  162. case _ = <-workerChannel:
  163. // TODO: push new command to worker client
  164. continue
  165. case <-time.After(cmdPollTime):
  166. // time limit exceeded, close the connection
  167. break
  168. }
  169. }
  170. }
  171. func (s *managerServer) setDBAdapter(adapter dbAdapter) {
  172. s.adapter = adapter
  173. }
  174. func makeHTTPServer(debug bool) *managerServer {
  175. // create gin engine
  176. if !debug {
  177. gin.SetMode(gin.ReleaseMode)
  178. }
  179. s := &managerServer{
  180. gin.Default(),
  181. nil,
  182. }
  183. // common log middleware
  184. s.Use(contextErrorLogger)
  185. s.GET("/ping", func(c *gin.Context) {
  186. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  187. })
  188. // list jobs, status page
  189. s.GET("/jobs", s.listAllJobs)
  190. // list workers
  191. s.GET("/workers", s.listWorkers)
  192. // worker online
  193. s.POST("/workers", s.registerWorker)
  194. // workerID should be valid in this route group
  195. workerValidateGroup := s.Group("/workers", s.workerIDValidator)
  196. // get job list
  197. workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
  198. // post job status
  199. workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
  200. // worker command polling
  201. workerValidateGroup.GET(":id/cmd_stream", s.getCmdOfWorker)
  202. // for tunasynctl to post commands
  203. s.POST("/cmd/", s.handleClientCmd)
  204. return s
  205. }