server.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package manager
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. "net/http"
  6. "github.com/gin-gonic/gin"
  7. . "github.com/tuna/tunasync/internal"
  8. )
  9. const (
  10. _errorKey = "error"
  11. _infoKey = "message"
  12. )
  13. var manager *Manager
  14. // A Manager represents a manager server
  15. type Manager struct {
  16. cfg *Config
  17. engine *gin.Engine
  18. adapter dbAdapter
  19. tlsConfig *tls.Config
  20. }
  21. // GetTUNASyncManager returns the manager from config
  22. func GetTUNASyncManager(cfg *Config) *Manager {
  23. if manager != nil {
  24. return manager
  25. }
  26. // create gin engine
  27. if !cfg.Debug {
  28. gin.SetMode(gin.ReleaseMode)
  29. }
  30. s := &Manager{
  31. cfg: cfg,
  32. engine: gin.Default(),
  33. adapter: nil,
  34. tlsConfig: nil,
  35. }
  36. if cfg.Files.CACert != "" {
  37. tlsConfig, err := GetTLSConfig(cfg.Files.CACert)
  38. if err != nil {
  39. logger.Error("Error initializing TLS config: %s", err.Error())
  40. return nil
  41. }
  42. s.tlsConfig = tlsConfig
  43. }
  44. if cfg.Files.DBFile != "" {
  45. adapter, err := makeDBAdapter(cfg.Files.DBType, cfg.Files.DBFile)
  46. if err != nil {
  47. logger.Error("Error initializing DB adapter: %s", err.Error())
  48. return nil
  49. }
  50. s.setDBAdapter(adapter)
  51. }
  52. // common log middleware
  53. s.engine.Use(contextErrorLogger)
  54. s.engine.GET("/ping", func(c *gin.Context) {
  55. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  56. })
  57. // list jobs, status page
  58. s.engine.GET("/jobs", s.listAllJobs)
  59. // list workers
  60. s.engine.GET("/workers", s.listWorkers)
  61. // worker online
  62. s.engine.POST("/workers", s.registerWorker)
  63. // workerID should be valid in this route group
  64. workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
  65. // get job list
  66. workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
  67. // post job status
  68. workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
  69. // for tunasynctl to post commands
  70. s.engine.POST("/cmd", s.handleClientCmd)
  71. manager = s
  72. return s
  73. }
  74. func (s *Manager) setDBAdapter(adapter dbAdapter) {
  75. s.adapter = adapter
  76. }
  77. // Run runs the manager server forever
  78. func (s *Manager) Run() {
  79. addr := fmt.Sprintf("%s:%d", s.cfg.Server.Addr, s.cfg.Server.Port)
  80. if s.cfg.Server.SSLCert == "" && s.cfg.Server.SSLKey == "" {
  81. if err := s.engine.Run(addr); err != nil {
  82. panic(err)
  83. }
  84. } else {
  85. if err := s.engine.RunTLS(addr, s.cfg.Server.SSLCert, s.cfg.Server.SSLKey); err != nil {
  86. panic(err)
  87. }
  88. }
  89. }
  90. // listAllJobs repond with all jobs of specified workers
  91. func (s *Manager) listAllJobs(c *gin.Context) {
  92. mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
  93. if err != nil {
  94. err := fmt.Errorf("failed to list all mirror status: %s",
  95. err.Error(),
  96. )
  97. c.Error(err)
  98. s.returnErrJSON(c, http.StatusInternalServerError, err)
  99. return
  100. }
  101. webMirStatusList := []webMirrorStatus{}
  102. for _, m := range mirrorStatusList {
  103. webMirStatusList = append(
  104. webMirStatusList,
  105. convertMirrorStatus(m),
  106. )
  107. }
  108. c.JSON(http.StatusOK, webMirStatusList)
  109. }
  110. // listWrokers respond with informations of all the workers
  111. func (s *Manager) listWorkers(c *gin.Context) {
  112. var workerInfos []WorkerStatus
  113. workers, err := s.adapter.ListWorkers()
  114. if err != nil {
  115. err := fmt.Errorf("failed to list workers: %s",
  116. err.Error(),
  117. )
  118. c.Error(err)
  119. s.returnErrJSON(c, http.StatusInternalServerError, err)
  120. return
  121. }
  122. for _, w := range workers {
  123. workerInfos = append(workerInfos,
  124. WorkerStatus{
  125. ID: w.ID,
  126. LastOnline: w.LastOnline,
  127. })
  128. }
  129. c.JSON(http.StatusOK, workerInfos)
  130. }
  131. // registerWorker register an newly-online worker
  132. func (s *Manager) registerWorker(c *gin.Context) {
  133. var _worker WorkerStatus
  134. c.BindJSON(&_worker)
  135. newWorker, err := s.adapter.CreateWorker(_worker)
  136. if err != nil {
  137. err := fmt.Errorf("failed to register worker: %s",
  138. err.Error(),
  139. )
  140. c.Error(err)
  141. s.returnErrJSON(c, http.StatusInternalServerError, err)
  142. return
  143. }
  144. // create workerCmd channel for this worker
  145. c.JSON(http.StatusOK, newWorker)
  146. }
  147. // listJobsOfWorker respond with all the jobs of the specified worker
  148. func (s *Manager) listJobsOfWorker(c *gin.Context) {
  149. workerID := c.Param("id")
  150. mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
  151. if err != nil {
  152. err := fmt.Errorf("failed to list jobs of worker %s: %s",
  153. workerID, err.Error(),
  154. )
  155. c.Error(err)
  156. s.returnErrJSON(c, http.StatusInternalServerError, err)
  157. return
  158. }
  159. c.JSON(http.StatusOK, mirrorStatusList)
  160. }
  161. func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) {
  162. c.JSON(code, gin.H{
  163. _errorKey: err.Error(),
  164. })
  165. }
  166. func (s *Manager) updateJobOfWorker(c *gin.Context) {
  167. workerID := c.Param("id")
  168. var status MirrorStatus
  169. c.BindJSON(&status)
  170. mirrorName := status.Name
  171. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  172. if err != nil {
  173. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  174. mirrorName, workerID, err.Error(),
  175. )
  176. c.Error(err)
  177. s.returnErrJSON(c, http.StatusInternalServerError, err)
  178. return
  179. }
  180. c.JSON(http.StatusOK, newStatus)
  181. }
  182. func (s *Manager) handleClientCmd(c *gin.Context) {
  183. var clientCmd ClientCmd
  184. c.BindJSON(&clientCmd)
  185. workerID := clientCmd.WorkerID
  186. if workerID == "" {
  187. // TODO: decide which worker should do this mirror when WorkerID is null string
  188. logger.Error("handleClientCmd case workerID == \" \" not implemented yet")
  189. c.AbortWithStatus(http.StatusInternalServerError)
  190. return
  191. }
  192. w, err := s.adapter.GetWorker(workerID)
  193. if err != nil {
  194. err := fmt.Errorf("worker %s is not registered yet", workerID)
  195. s.returnErrJSON(c, http.StatusBadRequest, err)
  196. return
  197. }
  198. workerURL := w.URL
  199. // parse client cmd into worker cmd
  200. workerCmd := WorkerCmd{
  201. Cmd: clientCmd.Cmd,
  202. MirrorID: clientCmd.MirrorID,
  203. Args: clientCmd.Args,
  204. }
  205. // post command to worker
  206. _, err = postJSON(workerURL, workerCmd)
  207. if err != nil {
  208. err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error())
  209. c.Error(err)
  210. s.returnErrJSON(c, http.StatusInternalServerError, err)
  211. return
  212. }
  213. // TODO: check response for success
  214. c.JSON(http.StatusOK, gin.H{_infoKey: "successfully send command to worker " + workerID})
  215. }