server.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package manager
  2. import (
  3. "fmt"
  4. "github.com/gin-gonic/gin"
  5. . "github.com/tuna/tunasync/internal"
  6. "net/http"
  7. "sync"
  8. "time"
  9. )
  10. const (
  11. maxQueuedCmdNum = 3
  12. cmdPollTime = 10 * time.Second
  13. )
  14. const (
  15. _errorKey = "error"
  16. _infoKey = "message"
  17. )
  18. type worker struct {
  19. // worker name
  20. id string
  21. // session token
  22. token string
  23. }
  24. var (
  25. workerChannelMu sync.RWMutex
  26. workerChannels = make(map[string]chan WorkerCmd)
  27. )
  28. type managerServer struct {
  29. *gin.Engine
  30. adapter dbAdapter
  31. }
  32. // listAllJobs repond with all jobs of specified workers
  33. func (s *managerServer) listAllJobs(c *gin.Context) {
  34. mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
  35. if err != nil {
  36. err := fmt.Errorf("failed to list all mirror status: %s",
  37. err.Error(),
  38. )
  39. c.Error(err)
  40. s.returnErrJSON(c, http.StatusInternalServerError, err)
  41. return
  42. }
  43. c.JSON(http.StatusOK, mirrorStatusList)
  44. }
  45. // listWrokers respond with informations of all the workers
  46. func (s *managerServer) listWorkers(c *gin.Context) {
  47. var workerInfos []WorkerInfoMsg
  48. workers, err := s.adapter.ListWorkers()
  49. if err != nil {
  50. err := fmt.Errorf("failed to list workers: %s",
  51. err.Error(),
  52. )
  53. c.Error(err)
  54. s.returnErrJSON(c, http.StatusInternalServerError, err)
  55. return
  56. }
  57. for _, w := range workers {
  58. workerInfos = append(workerInfos,
  59. WorkerInfoMsg{w.id})
  60. }
  61. c.JSON(http.StatusOK, workerInfos)
  62. }
  63. // registerWorker register an newly-online worker
  64. func (s *managerServer) registerWorker(c *gin.Context) {
  65. var _worker worker
  66. c.BindJSON(&_worker)
  67. newWorker, err := s.adapter.CreateWorker(_worker)
  68. if err != nil {
  69. err := fmt.Errorf("failed to register worker: %s",
  70. err.Error(),
  71. )
  72. c.Error(err)
  73. s.returnErrJSON(c, http.StatusInternalServerError, err)
  74. return
  75. }
  76. // create workerCmd channel for this worker
  77. workerChannelMu.Lock()
  78. defer workerChannelMu.Unlock()
  79. workerChannels[_worker.id] = make(chan WorkerCmd, maxQueuedCmdNum)
  80. c.JSON(http.StatusOK, newWorker)
  81. }
  82. // listJobsOfWorker respond with all the jobs of the specified worker
  83. func (s *managerServer) listJobsOfWorker(c *gin.Context) {
  84. workerID := c.Param("id")
  85. mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
  86. if err != nil {
  87. err := fmt.Errorf("failed to list jobs of worker %s: %s",
  88. workerID, err.Error(),
  89. )
  90. c.Error(err)
  91. s.returnErrJSON(c, http.StatusInternalServerError, err)
  92. return
  93. }
  94. c.JSON(http.StatusOK, mirrorStatusList)
  95. }
  96. func (s *managerServer) returnErrJSON(c *gin.Context, code int, err error) {
  97. c.JSON(code, gin.H{
  98. _errorKey: err.Error(),
  99. })
  100. }
  101. func (s *managerServer) updateJobOfWorker(c *gin.Context) {
  102. workerID := c.Param("id")
  103. var status mirrorStatus
  104. c.BindJSON(&status)
  105. mirrorName := status.Name
  106. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  107. if err != nil {
  108. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  109. mirrorName, workerID, err.Error(),
  110. )
  111. c.Error(err)
  112. s.returnErrJSON(c, http.StatusInternalServerError, err)
  113. return
  114. }
  115. c.JSON(http.StatusOK, newStatus)
  116. }
  117. func (s *managerServer) handleClientCmd(c *gin.Context) {
  118. workerChannelMu.RLock()
  119. defer workerChannelMu.RUnlock()
  120. var clientCmd ClientCmd
  121. c.BindJSON(&clientCmd)
  122. // TODO: decide which worker should do this mirror when WorkerID is null string
  123. workerID := clientCmd.WorkerID
  124. if workerID == "" {
  125. // TODO: decide which worker should do this mirror when WorkerID is null string
  126. logger.Error("handleClientCmd case workerID == \" \" not implemented yet")
  127. c.AbortWithStatus(http.StatusInternalServerError)
  128. return
  129. }
  130. workerChannel, ok := workerChannels[workerID]
  131. if !ok {
  132. err := fmt.Errorf("worker %s is not registered yet", workerID)
  133. s.returnErrJSON(c, http.StatusBadRequest, err)
  134. return
  135. }
  136. // parse client cmd into worker cmd
  137. workerCmd := WorkerCmd{
  138. Cmd: clientCmd.Cmd,
  139. MirrorID: clientCmd.MirrorID,
  140. Args: clientCmd.Args,
  141. }
  142. select {
  143. case workerChannel <- workerCmd:
  144. // successfully insert command to channel
  145. c.JSON(http.StatusOK, struct{}{})
  146. default:
  147. // pending commands for that worker exceed
  148. // the maxQueuedCmdNum threshold
  149. err := fmt.Errorf("pending commands for worker %s exceed"+
  150. "the %d threshold, the command is dropped",
  151. workerID, maxQueuedCmdNum)
  152. c.Error(err)
  153. s.returnErrJSON(c, http.StatusServiceUnavailable, err)
  154. return
  155. }
  156. }
  157. func (s *managerServer) getCmdOfWorker(c *gin.Context) {
  158. workerID := c.Param("id")
  159. workerChannelMu.RLock()
  160. defer workerChannelMu.RUnlock()
  161. workerChannel := workerChannels[workerID]
  162. for {
  163. select {
  164. case _ = <-workerChannel:
  165. // TODO: push new command to worker client
  166. continue
  167. case <-time.After(cmdPollTime):
  168. // time limit exceeded, close the connection
  169. break
  170. }
  171. }
  172. }
  173. func (s *managerServer) setDBAdapter(adapter dbAdapter) {
  174. s.adapter = adapter
  175. }
  176. func makeHTTPServer(debug bool) *managerServer {
  177. // create gin engine
  178. if !debug {
  179. gin.SetMode(gin.ReleaseMode)
  180. }
  181. s := &managerServer{
  182. gin.Default(),
  183. nil,
  184. }
  185. s.GET("/ping", func(c *gin.Context) {
  186. c.JSON(http.StatusOK, gin.H{"msg": "pong"})
  187. })
  188. // list jobs, status page
  189. s.GET("/jobs", s.listAllJobs)
  190. // list workers
  191. s.GET("/workers", s.listWorkers)
  192. // worker online
  193. s.POST("/workers/:id", s.registerWorker)
  194. // get job list
  195. s.GET("/workers/:id/jobs", s.listJobsOfWorker)
  196. // post job status
  197. s.POST("/workers/:id/jobs/:job", s.updateJobOfWorker)
  198. // worker command polling
  199. s.GET("/workers/:id/cmd_stream", s.getCmdOfWorker)
  200. // for tunasynctl to post commands
  201. s.POST("/cmd/", s.handleClientCmd)
  202. return s
  203. }