server.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package manager
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "github.com/gin-gonic/gin"
  8. . "github.com/tuna/tunasync/internal"
  9. )
  10. const (
  11. _errorKey = "error"
  12. _infoKey = "message"
  13. )
  14. var manager *Manager
  15. // A Manager represents a manager server
  16. type Manager struct {
  17. cfg *Config
  18. engine *gin.Engine
  19. adapter dbAdapter
  20. tlsConfig *tls.Config
  21. }
  22. // GetTUNASyncManager returns the manager from config
  23. func GetTUNASyncManager(cfg *Config) *Manager {
  24. if manager != nil {
  25. return manager
  26. }
  27. // create gin engine
  28. if !cfg.Debug {
  29. gin.SetMode(gin.ReleaseMode)
  30. }
  31. s := &Manager{
  32. cfg: cfg,
  33. engine: gin.Default(),
  34. adapter: nil,
  35. tlsConfig: nil,
  36. }
  37. if cfg.Files.CACert != "" {
  38. tlsConfig, err := GetTLSConfig(cfg.Files.CACert)
  39. if err != nil {
  40. logger.Error("Error initializing TLS config: %s", err.Error())
  41. return nil
  42. }
  43. s.tlsConfig = tlsConfig
  44. }
  45. if cfg.Files.DBFile != "" {
  46. adapter, err := makeDBAdapter(cfg.Files.DBType, cfg.Files.DBFile)
  47. if err != nil {
  48. logger.Error("Error initializing DB adapter: %s", err.Error())
  49. return nil
  50. }
  51. s.setDBAdapter(adapter)
  52. }
  53. // common log middleware
  54. s.engine.Use(contextErrorLogger)
  55. s.engine.GET("/ping", func(c *gin.Context) {
  56. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  57. })
  58. // list jobs, status page
  59. s.engine.GET("/jobs", s.listAllJobs)
  60. // list workers
  61. s.engine.GET("/workers", s.listWorkers)
  62. // worker online
  63. s.engine.POST("/workers", s.registerWorker)
  64. // workerID should be valid in this route group
  65. workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
  66. // get job list
  67. workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
  68. // post job status
  69. workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
  70. // for tunasynctl to post commands
  71. s.engine.POST("/cmd", s.handleClientCmd)
  72. manager = s
  73. return s
  74. }
  75. func (s *Manager) setDBAdapter(adapter dbAdapter) {
  76. s.adapter = adapter
  77. }
  78. // Run runs the manager server forever
  79. func (s *Manager) Run() {
  80. addr := fmt.Sprintf("%s:%d", s.cfg.Server.Addr, s.cfg.Server.Port)
  81. if s.cfg.Server.SSLCert == "" && s.cfg.Server.SSLKey == "" {
  82. if err := s.engine.Run(addr); err != nil {
  83. panic(err)
  84. }
  85. } else {
  86. if err := s.engine.RunTLS(addr, s.cfg.Server.SSLCert, s.cfg.Server.SSLKey); err != nil {
  87. panic(err)
  88. }
  89. }
  90. }
  91. // listAllJobs repond with all jobs of specified workers
  92. func (s *Manager) listAllJobs(c *gin.Context) {
  93. mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
  94. if err != nil {
  95. err := fmt.Errorf("failed to list all mirror status: %s",
  96. err.Error(),
  97. )
  98. c.Error(err)
  99. s.returnErrJSON(c, http.StatusInternalServerError, err)
  100. return
  101. }
  102. webMirStatusList := []webMirrorStatus{}
  103. for _, m := range mirrorStatusList {
  104. webMirStatusList = append(
  105. webMirStatusList,
  106. convertMirrorStatus(m),
  107. )
  108. }
  109. c.JSON(http.StatusOK, webMirStatusList)
  110. }
  111. // listWrokers respond with informations of all the workers
  112. func (s *Manager) listWorkers(c *gin.Context) {
  113. var workerInfos []WorkerStatus
  114. workers, err := s.adapter.ListWorkers()
  115. if err != nil {
  116. err := fmt.Errorf("failed to list workers: %s",
  117. err.Error(),
  118. )
  119. c.Error(err)
  120. s.returnErrJSON(c, http.StatusInternalServerError, err)
  121. return
  122. }
  123. for _, w := range workers {
  124. workerInfos = append(workerInfos,
  125. WorkerStatus{
  126. ID: w.ID,
  127. LastOnline: w.LastOnline,
  128. })
  129. }
  130. c.JSON(http.StatusOK, workerInfos)
  131. }
  132. // registerWorker register an newly-online worker
  133. func (s *Manager) registerWorker(c *gin.Context) {
  134. var _worker WorkerStatus
  135. c.BindJSON(&_worker)
  136. _worker.LastOnline = time.Now()
  137. newWorker, err := s.adapter.CreateWorker(_worker)
  138. if err != nil {
  139. err := fmt.Errorf("failed to register worker: %s",
  140. err.Error(),
  141. )
  142. c.Error(err)
  143. s.returnErrJSON(c, http.StatusInternalServerError, err)
  144. return
  145. }
  146. // create workerCmd channel for this worker
  147. c.JSON(http.StatusOK, newWorker)
  148. }
  149. // listJobsOfWorker respond with all the jobs of the specified worker
  150. func (s *Manager) listJobsOfWorker(c *gin.Context) {
  151. workerID := c.Param("id")
  152. mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
  153. if err != nil {
  154. err := fmt.Errorf("failed to list jobs of worker %s: %s",
  155. workerID, err.Error(),
  156. )
  157. c.Error(err)
  158. s.returnErrJSON(c, http.StatusInternalServerError, err)
  159. return
  160. }
  161. c.JSON(http.StatusOK, mirrorStatusList)
  162. }
  163. func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) {
  164. c.JSON(code, gin.H{
  165. _errorKey: err.Error(),
  166. })
  167. }
  168. func (s *Manager) updateJobOfWorker(c *gin.Context) {
  169. workerID := c.Param("id")
  170. var status MirrorStatus
  171. c.BindJSON(&status)
  172. mirrorName := status.Name
  173. curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
  174. if err != nil {
  175. err := fmt.Errorf("failed to get job %s of worker %s: %s",
  176. mirrorName, workerID, err.Error(),
  177. )
  178. c.Error(err)
  179. s.returnErrJSON(c, http.StatusInternalServerError, err)
  180. return
  181. }
  182. // Only successful syncing needs last_update
  183. if status.Status == Success {
  184. status.LastUpdate = time.Now()
  185. } else {
  186. status.LastUpdate = curStatus.LastUpdate
  187. }
  188. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  189. if err != nil {
  190. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  191. mirrorName, workerID, err.Error(),
  192. )
  193. c.Error(err)
  194. s.returnErrJSON(c, http.StatusInternalServerError, err)
  195. return
  196. }
  197. c.JSON(http.StatusOK, newStatus)
  198. }
  199. func (s *Manager) handleClientCmd(c *gin.Context) {
  200. var clientCmd ClientCmd
  201. c.BindJSON(&clientCmd)
  202. workerID := clientCmd.WorkerID
  203. if workerID == "" {
  204. // TODO: decide which worker should do this mirror when WorkerID is null string
  205. logger.Error("handleClientCmd case workerID == \" \" not implemented yet")
  206. c.AbortWithStatus(http.StatusInternalServerError)
  207. return
  208. }
  209. w, err := s.adapter.GetWorker(workerID)
  210. if err != nil {
  211. err := fmt.Errorf("worker %s is not registered yet", workerID)
  212. s.returnErrJSON(c, http.StatusBadRequest, err)
  213. return
  214. }
  215. workerURL := w.URL
  216. // parse client cmd into worker cmd
  217. workerCmd := WorkerCmd{
  218. Cmd: clientCmd.Cmd,
  219. MirrorID: clientCmd.MirrorID,
  220. Args: clientCmd.Args,
  221. }
  222. // update job status, even if the job did not disable successfully,
  223. // this status should be set as disabled
  224. curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
  225. changed := false
  226. switch clientCmd.Cmd {
  227. case CmdDisable:
  228. curStat.Status = Disabled
  229. changed = true
  230. case CmdStop:
  231. curStat.Status = Paused
  232. changed = true
  233. }
  234. if changed {
  235. s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
  236. }
  237. // post command to worker
  238. _, err = PostJSON(workerURL, workerCmd, s.tlsConfig)
  239. if err != nil {
  240. err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error())
  241. c.Error(err)
  242. s.returnErrJSON(c, http.StatusInternalServerError, err)
  243. return
  244. }
  245. // TODO: check response for success
  246. c.JSON(http.StatusOK, gin.H{_infoKey: "successfully send command to worker " + workerID})
  247. }