server.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package manager
  2. import (
  3. "fmt"
  4. "net/http"
  5. "time"
  6. "github.com/gin-gonic/gin"
  7. . "github.com/tuna/tunasync/internal"
  8. )
  9. const (
  10. _errorKey = "error"
  11. _infoKey = "message"
  12. )
  13. var manager *Manager
  14. // A Manager represents a manager server
  15. type Manager struct {
  16. cfg *Config
  17. engine *gin.Engine
  18. adapter dbAdapter
  19. httpClient *http.Client
  20. }
  21. // GetTUNASyncManager returns the manager from config
  22. func GetTUNASyncManager(cfg *Config) *Manager {
  23. if manager != nil {
  24. return manager
  25. }
  26. // create gin engine
  27. if !cfg.Debug {
  28. gin.SetMode(gin.ReleaseMode)
  29. }
  30. s := &Manager{
  31. cfg: cfg,
  32. adapter: nil,
  33. }
  34. s.engine = gin.New()
  35. s.engine.Use(gin.Recovery())
  36. if cfg.Debug {
  37. s.engine.Use(gin.Logger())
  38. }
  39. if cfg.Files.CACert != "" {
  40. httpClient, err := CreateHTTPClient(cfg.Files.CACert)
  41. if err != nil {
  42. logger.Errorf("Error initializing HTTP client: %s", err.Error())
  43. return nil
  44. }
  45. s.httpClient = httpClient
  46. }
  47. if cfg.Files.DBFile != "" {
  48. adapter, err := makeDBAdapter(cfg.Files.DBType, cfg.Files.DBFile)
  49. if err != nil {
  50. logger.Errorf("Error initializing DB adapter: %s", err.Error())
  51. return nil
  52. }
  53. s.setDBAdapter(adapter)
  54. }
  55. // common log middleware
  56. s.engine.Use(contextErrorLogger)
  57. s.engine.GET("/ping", func(c *gin.Context) {
  58. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  59. })
  60. // list jobs, status page
  61. s.engine.GET("/jobs", s.listAllJobs)
  62. // list workers
  63. s.engine.GET("/workers", s.listWorkers)
  64. // worker online
  65. s.engine.POST("/workers", s.registerWorker)
  66. // workerID should be valid in this route group
  67. workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
  68. // get job list
  69. workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
  70. // post job status
  71. workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
  72. // for tunasynctl to post commands
  73. s.engine.POST("/cmd", s.handleClientCmd)
  74. manager = s
  75. return s
  76. }
  77. func (s *Manager) setDBAdapter(adapter dbAdapter) {
  78. s.adapter = adapter
  79. }
  80. // Run runs the manager server forever
  81. func (s *Manager) Run() {
  82. addr := fmt.Sprintf("%s:%d", s.cfg.Server.Addr, s.cfg.Server.Port)
  83. httpServer := &http.Server{
  84. Addr: addr,
  85. Handler: s.engine,
  86. ReadTimeout: 10 * time.Second,
  87. WriteTimeout: 10 * time.Second,
  88. }
  89. if s.cfg.Server.SSLCert == "" && s.cfg.Server.SSLKey == "" {
  90. if err := httpServer.ListenAndServe(); err != nil {
  91. panic(err)
  92. }
  93. } else {
  94. if err := httpServer.ListenAndServeTLS(s.cfg.Server.SSLCert, s.cfg.Server.SSLKey); err != nil {
  95. panic(err)
  96. }
  97. }
  98. }
  99. // listAllJobs repond with all jobs of specified workers
  100. func (s *Manager) listAllJobs(c *gin.Context) {
  101. mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
  102. if err != nil {
  103. err := fmt.Errorf("failed to list all mirror status: %s",
  104. err.Error(),
  105. )
  106. c.Error(err)
  107. s.returnErrJSON(c, http.StatusInternalServerError, err)
  108. return
  109. }
  110. webMirStatusList := []webMirrorStatus{}
  111. for _, m := range mirrorStatusList {
  112. webMirStatusList = append(
  113. webMirStatusList,
  114. convertMirrorStatus(m),
  115. )
  116. }
  117. c.JSON(http.StatusOK, webMirStatusList)
  118. }
  119. // listWrokers respond with informations of all the workers
  120. func (s *Manager) listWorkers(c *gin.Context) {
  121. var workerInfos []WorkerStatus
  122. workers, err := s.adapter.ListWorkers()
  123. if err != nil {
  124. err := fmt.Errorf("failed to list workers: %s",
  125. err.Error(),
  126. )
  127. c.Error(err)
  128. s.returnErrJSON(c, http.StatusInternalServerError, err)
  129. return
  130. }
  131. for _, w := range workers {
  132. workerInfos = append(workerInfos,
  133. WorkerStatus{
  134. ID: w.ID,
  135. LastOnline: w.LastOnline,
  136. })
  137. }
  138. c.JSON(http.StatusOK, workerInfos)
  139. }
  140. // registerWorker register an newly-online worker
  141. func (s *Manager) registerWorker(c *gin.Context) {
  142. var _worker WorkerStatus
  143. c.BindJSON(&_worker)
  144. _worker.LastOnline = time.Now()
  145. newWorker, err := s.adapter.CreateWorker(_worker)
  146. if err != nil {
  147. err := fmt.Errorf("failed to register worker: %s",
  148. err.Error(),
  149. )
  150. c.Error(err)
  151. s.returnErrJSON(c, http.StatusInternalServerError, err)
  152. return
  153. }
  154. logger.Noticef("Worker <%s> registered", _worker.ID)
  155. // create workerCmd channel for this worker
  156. c.JSON(http.StatusOK, newWorker)
  157. }
  158. // listJobsOfWorker respond with all the jobs of the specified worker
  159. func (s *Manager) listJobsOfWorker(c *gin.Context) {
  160. workerID := c.Param("id")
  161. mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
  162. if err != nil {
  163. err := fmt.Errorf("failed to list jobs of worker %s: %s",
  164. workerID, err.Error(),
  165. )
  166. c.Error(err)
  167. s.returnErrJSON(c, http.StatusInternalServerError, err)
  168. return
  169. }
  170. c.JSON(http.StatusOK, mirrorStatusList)
  171. }
  172. func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) {
  173. c.JSON(code, gin.H{
  174. _errorKey: err.Error(),
  175. })
  176. }
  177. func (s *Manager) updateJobOfWorker(c *gin.Context) {
  178. workerID := c.Param("id")
  179. var status MirrorStatus
  180. c.BindJSON(&status)
  181. mirrorName := status.Name
  182. curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
  183. // Only successful syncing needs last_update
  184. if status.Status == Success {
  185. status.LastUpdate = time.Now()
  186. } else {
  187. status.LastUpdate = curStatus.LastUpdate
  188. }
  189. // for logging
  190. switch status.Status {
  191. case Success:
  192. logger.Noticef("Job [%s] @<%s> success", status.Name, status.Worker)
  193. case Failed:
  194. logger.Warningf("Job [%s] @<%s> failed", status.Name, status.Worker)
  195. case Syncing:
  196. logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
  197. case Disabled:
  198. logger.Noticef("Job [%s] @<%s> disabled", status.Name, status.Worker)
  199. case Paused:
  200. logger.Noticef("Job [%s] @<%s> paused", status.Name, status.Worker)
  201. default:
  202. logger.Infof("Job [%s] @<%s> status: %s", status.Name, status.Worker, status.Status)
  203. }
  204. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  205. if err != nil {
  206. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  207. mirrorName, workerID, err.Error(),
  208. )
  209. c.Error(err)
  210. s.returnErrJSON(c, http.StatusInternalServerError, err)
  211. return
  212. }
  213. c.JSON(http.StatusOK, newStatus)
  214. }
  215. func (s *Manager) handleClientCmd(c *gin.Context) {
  216. var clientCmd ClientCmd
  217. c.BindJSON(&clientCmd)
  218. workerID := clientCmd.WorkerID
  219. if workerID == "" {
  220. // TODO: decide which worker should do this mirror when WorkerID is null string
  221. logger.Errorf("handleClientCmd case workerID == \" \" not implemented yet")
  222. c.AbortWithStatus(http.StatusInternalServerError)
  223. return
  224. }
  225. w, err := s.adapter.GetWorker(workerID)
  226. if err != nil {
  227. err := fmt.Errorf("worker %s is not registered yet", workerID)
  228. s.returnErrJSON(c, http.StatusBadRequest, err)
  229. return
  230. }
  231. workerURL := w.URL
  232. // parse client cmd into worker cmd
  233. workerCmd := WorkerCmd{
  234. Cmd: clientCmd.Cmd,
  235. MirrorID: clientCmd.MirrorID,
  236. Args: clientCmd.Args,
  237. }
  238. // update job status, even if the job did not disable successfully,
  239. // this status should be set as disabled
  240. curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
  241. changed := false
  242. switch clientCmd.Cmd {
  243. case CmdDisable:
  244. curStat.Status = Disabled
  245. changed = true
  246. case CmdStop:
  247. curStat.Status = Paused
  248. changed = true
  249. }
  250. if changed {
  251. s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
  252. }
  253. logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)
  254. // post command to worker
  255. _, err = PostJSON(workerURL, workerCmd, s.httpClient)
  256. if err != nil {
  257. err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error())
  258. c.Error(err)
  259. s.returnErrJSON(c, http.StatusInternalServerError, err)
  260. return
  261. }
  262. // TODO: check response for success
  263. c.JSON(http.StatusOK, gin.H{_infoKey: "successfully send command to worker " + workerID})
  264. }