server.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package manager
  2. import (
  3. "fmt"
  4. "net/http"
  5. "sync"
  6. "time"
  7. "github.com/gin-gonic/gin"
  8. . "github.com/tuna/tunasync/internal"
  9. )
  10. const (
  11. maxQueuedCmdNum = 3
  12. cmdPollTime = 10 * time.Second
  13. )
  14. const (
  15. _errorKey = "error"
  16. _infoKey = "message"
  17. )
  18. type workerStatus struct {
  19. ID string `json:"id"` // worker name
  20. Token string `json:"token"` // session token
  21. LastOnline time.Time `json:"last_online"` // last seen
  22. }
  23. var (
  24. workerChannelMu sync.RWMutex
  25. workerChannels = make(map[string]chan WorkerCmd)
  26. )
  27. type managerServer struct {
  28. *gin.Engine
  29. adapter dbAdapter
  30. }
  31. // listAllJobs repond with all jobs of specified workers
  32. func (s *managerServer) listAllJobs(c *gin.Context) {
  33. mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
  34. if err != nil {
  35. err := fmt.Errorf("failed to list all mirror status: %s",
  36. err.Error(),
  37. )
  38. c.Error(err)
  39. s.returnErrJSON(c, http.StatusInternalServerError, err)
  40. return
  41. }
  42. c.JSON(http.StatusOK, mirrorStatusList)
  43. }
  44. // listWrokers respond with informations of all the workers
  45. func (s *managerServer) listWorkers(c *gin.Context) {
  46. var workerInfos []WorkerInfoMsg
  47. workers, err := s.adapter.ListWorkers()
  48. if err != nil {
  49. err := fmt.Errorf("failed to list workers: %s",
  50. err.Error(),
  51. )
  52. c.Error(err)
  53. s.returnErrJSON(c, http.StatusInternalServerError, err)
  54. return
  55. }
  56. for _, w := range workers {
  57. workerInfos = append(workerInfos,
  58. WorkerInfoMsg{w.ID, w.LastOnline})
  59. }
  60. c.JSON(http.StatusOK, workerInfos)
  61. }
  62. // registerWorker register an newly-online worker
  63. func (s *managerServer) registerWorker(c *gin.Context) {
  64. var _worker workerStatus
  65. c.BindJSON(&_worker)
  66. newWorker, err := s.adapter.CreateWorker(_worker)
  67. if err != nil {
  68. err := fmt.Errorf("failed to register worker: %s",
  69. err.Error(),
  70. )
  71. c.Error(err)
  72. s.returnErrJSON(c, http.StatusInternalServerError, err)
  73. return
  74. }
  75. // create workerCmd channel for this worker
  76. workerChannelMu.Lock()
  77. defer workerChannelMu.Unlock()
  78. workerChannels[_worker.ID] = make(chan WorkerCmd, maxQueuedCmdNum)
  79. c.JSON(http.StatusOK, newWorker)
  80. }
  81. // listJobsOfWorker respond with all the jobs of the specified worker
  82. func (s *managerServer) listJobsOfWorker(c *gin.Context) {
  83. workerID := c.Param("id")
  84. mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
  85. if err != nil {
  86. err := fmt.Errorf("failed to list jobs of worker %s: %s",
  87. workerID, err.Error(),
  88. )
  89. c.Error(err)
  90. s.returnErrJSON(c, http.StatusInternalServerError, err)
  91. return
  92. }
  93. c.JSON(http.StatusOK, mirrorStatusList)
  94. }
  95. func (s *managerServer) returnErrJSON(c *gin.Context, code int, err error) {
  96. c.JSON(code, gin.H{
  97. _errorKey: err.Error(),
  98. })
  99. }
  100. func (s *managerServer) updateJobOfWorker(c *gin.Context) {
  101. workerID := c.Param("id")
  102. var status mirrorStatus
  103. c.BindJSON(&status)
  104. mirrorName := status.Name
  105. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  106. if err != nil {
  107. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  108. mirrorName, workerID, err.Error(),
  109. )
  110. c.Error(err)
  111. s.returnErrJSON(c, http.StatusInternalServerError, err)
  112. return
  113. }
  114. c.JSON(http.StatusOK, newStatus)
  115. }
  116. func (s *managerServer) handleClientCmd(c *gin.Context) {
  117. workerChannelMu.RLock()
  118. defer workerChannelMu.RUnlock()
  119. var clientCmd ClientCmd
  120. c.BindJSON(&clientCmd)
  121. // TODO: decide which worker should do this mirror when WorkerID is null string
  122. workerID := clientCmd.WorkerID
  123. if workerID == "" {
  124. // TODO: decide which worker should do this mirror when WorkerID is null string
  125. logger.Error("handleClientCmd case workerID == \" \" not implemented yet")
  126. c.AbortWithStatus(http.StatusInternalServerError)
  127. return
  128. }
  129. workerChannel, ok := workerChannels[workerID]
  130. if !ok {
  131. err := fmt.Errorf("worker %s is not registered yet", workerID)
  132. s.returnErrJSON(c, http.StatusBadRequest, err)
  133. return
  134. }
  135. // parse client cmd into worker cmd
  136. workerCmd := WorkerCmd{
  137. Cmd: clientCmd.Cmd,
  138. MirrorID: clientCmd.MirrorID,
  139. Args: clientCmd.Args,
  140. }
  141. select {
  142. case workerChannel <- workerCmd:
  143. // successfully insert command to channel
  144. c.JSON(http.StatusOK, struct{}{})
  145. default:
  146. // pending commands for that worker exceed
  147. // the maxQueuedCmdNum threshold
  148. err := fmt.Errorf("pending commands for worker %s exceed"+
  149. "the %d threshold, the command is dropped",
  150. workerID, maxQueuedCmdNum)
  151. c.Error(err)
  152. s.returnErrJSON(c, http.StatusServiceUnavailable, err)
  153. return
  154. }
  155. }
  156. func (s *managerServer) getCmdOfWorker(c *gin.Context) {
  157. workerID := c.Param("id")
  158. workerChannelMu.RLock()
  159. defer workerChannelMu.RUnlock()
  160. workerChannel := workerChannels[workerID]
  161. for {
  162. select {
  163. case _ = <-workerChannel:
  164. // TODO: push new command to worker client
  165. continue
  166. case <-time.After(cmdPollTime):
  167. // time limit exceeded, close the connection
  168. break
  169. }
  170. }
  171. }
  172. func (s *managerServer) setDBAdapter(adapter dbAdapter) {
  173. s.adapter = adapter
  174. }
  175. func makeHTTPServer(debug bool) *managerServer {
  176. // create gin engine
  177. if !debug {
  178. gin.SetMode(gin.ReleaseMode)
  179. }
  180. s := &managerServer{
  181. gin.Default(),
  182. nil,
  183. }
  184. // common log middleware
  185. s.Use(contextErrorLogger)
  186. s.GET("/ping", func(c *gin.Context) {
  187. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  188. })
  189. // list jobs, status page
  190. s.GET("/jobs", s.listAllJobs)
  191. // list workers
  192. s.GET("/workers", s.listWorkers)
  193. // worker online
  194. s.POST("/workers", s.registerWorker)
  195. // workerID should be valid in this route group
  196. workerValidateGroup := s.Group("/workers", s.workerIDValidator)
  197. // get job list
  198. workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
  199. // post job status
  200. workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
  201. // worker command polling
  202. workerValidateGroup.GET(":id/cmd_stream", s.getCmdOfWorker)
  203. // for tunasynctl to post commands
  204. s.POST("/cmd/", s.handleClientCmd)
  205. return s
  206. }