server.go 9.8 KB


  1. package manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "github.com/gin-gonic/gin"
  8. . "github.com/tuna/tunasync/internal"
  9. )
  10. const (
  11. _errorKey = "error"
  12. _infoKey = "message"
  13. )
  14. var manager *Manager
  15. // A Manager represents a manager server
  16. type Manager struct {
  17. cfg *Config
  18. engine *gin.Engine
  19. adapter dbAdapter
  20. httpClient *http.Client
  21. }
  22. // GetTUNASyncManager returns the manager from config
  23. func GetTUNASyncManager(cfg *Config) *Manager {
  24. if manager != nil {
  25. return manager
  26. }
  27. // create gin engine
  28. if !cfg.Debug {
  29. gin.SetMode(gin.ReleaseMode)
  30. }
  31. s := &Manager{
  32. cfg: cfg,
  33. adapter: nil,
  34. }
  35. s.engine = gin.New()
  36. s.engine.Use(gin.Recovery())
  37. if cfg.Debug {
  38. s.engine.Use(gin.Logger())
  39. }
  40. if cfg.Files.CACert != "" {
  41. httpClient, err := CreateHTTPClient(cfg.Files.CACert)
  42. if err != nil {
  43. logger.Errorf("Error initializing HTTP client: %s", err.Error())
  44. return nil
  45. }
  46. s.httpClient = httpClient
  47. }
  48. if cfg.Files.DBFile != "" {
  49. adapter, err := makeDBAdapter(cfg.Files.DBType, cfg.Files.DBFile)
  50. if err != nil {
  51. logger.Errorf("Error initializing DB adapter: %s", err.Error())
  52. return nil
  53. }
  54. s.setDBAdapter(adapter)
  55. }
  56. // common log middleware
  57. s.engine.Use(contextErrorLogger)
  58. s.engine.GET("/ping", func(c *gin.Context) {
  59. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  60. })
  61. // list jobs, status page
  62. s.engine.GET("/jobs", s.listAllJobs)
  63. // flush disabled jobs
  64. s.engine.DELETE("/jobs/disabled", s.flushDisabledJobs)
  65. // list workers
  66. s.engine.GET("/workers", s.listWorkers)
  67. // worker online
  68. s.engine.POST("/workers", s.registerWorker)
  69. // workerID should be valid in this route group
  70. workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
  71. {
  72. // delete specified worker
  73. workerValidateGroup.DELETE(":id", s.deleteWorker)
  74. // get job list
  75. workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
  76. // post job status
  77. workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
  78. workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize)
  79. }
  80. // for tunasynctl to post commands
  81. s.engine.POST("/cmd", s.handleClientCmd)
  82. manager = s
  83. return s
  84. }
  85. func (s *Manager) setDBAdapter(adapter dbAdapter) {
  86. s.adapter = adapter
  87. }
  88. // Run runs the manager server forever
  89. func (s *Manager) Run() {
  90. addr := fmt.Sprintf("%s:%d", s.cfg.Server.Addr, s.cfg.Server.Port)
  91. httpServer := &http.Server{
  92. Addr: addr,
  93. Handler: s.engine,
  94. ReadTimeout: 10 * time.Second,
  95. WriteTimeout: 10 * time.Second,
  96. }
  97. if s.cfg.Server.SSLCert == "" && s.cfg.Server.SSLKey == "" {
  98. if err := httpServer.ListenAndServe(); err != nil {
  99. panic(err)
  100. }
  101. } else {
  102. if err := httpServer.ListenAndServeTLS(s.cfg.Server.SSLCert, s.cfg.Server.SSLKey); err != nil {
  103. panic(err)
  104. }
  105. }
  106. }
  107. // listAllJobs repond with all jobs of specified workers
  108. func (s *Manager) listAllJobs(c *gin.Context) {
  109. mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
  110. if err != nil {
  111. err := fmt.Errorf("failed to list all mirror status: %s",
  112. err.Error(),
  113. )
  114. c.Error(err)
  115. s.returnErrJSON(c, http.StatusInternalServerError, err)
  116. return
  117. }
  118. webMirStatusList := []WebMirrorStatus{}
  119. for _, m := range mirrorStatusList {
  120. webMirStatusList = append(
  121. webMirStatusList,
  122. BuildWebMirrorStatus(m),
  123. )
  124. }
  125. c.JSON(http.StatusOK, webMirStatusList)
  126. }
  127. // flushDisabledJobs deletes all jobs that marks as deleted
  128. func (s *Manager) flushDisabledJobs(c *gin.Context) {
  129. err := s.adapter.FlushDisabledJobs()
  130. if err != nil {
  131. err := fmt.Errorf("failed to flush disabled jobs: %s",
  132. err.Error(),
  133. )
  134. c.Error(err)
  135. s.returnErrJSON(c, http.StatusInternalServerError, err)
  136. return
  137. }
  138. c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
  139. }
  140. // deleteWorker deletes one worker by id
  141. func (s *Manager) deleteWorker(c *gin.Context) {
  142. workerID := c.Param("id")
  143. err := s.adapter.DeleteWorker(workerID)
  144. if err != nil {
  145. err := fmt.Errorf("failed to delete worker: %s",
  146. err.Error(),
  147. )
  148. c.Error(err)
  149. s.returnErrJSON(c, http.StatusInternalServerError, err)
  150. return
  151. }
  152. logger.Noticef("Worker <%s> deleted", workerID)
  153. c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
  154. }
  155. // listWrokers respond with informations of all the workers
  156. func (s *Manager) listWorkers(c *gin.Context) {
  157. var workerInfos []WorkerStatus
  158. workers, err := s.adapter.ListWorkers()
  159. if err != nil {
  160. err := fmt.Errorf("failed to list workers: %s",
  161. err.Error(),
  162. )
  163. c.Error(err)
  164. s.returnErrJSON(c, http.StatusInternalServerError, err)
  165. return
  166. }
  167. for _, w := range workers {
  168. workerInfos = append(workerInfos,
  169. WorkerStatus{
  170. ID: w.ID,
  171. LastOnline: w.LastOnline,
  172. })
  173. }
  174. c.JSON(http.StatusOK, workerInfos)
  175. }
  176. // registerWorker register an newly-online worker
  177. func (s *Manager) registerWorker(c *gin.Context) {
  178. var _worker WorkerStatus
  179. c.BindJSON(&_worker)
  180. _worker.LastOnline = time.Now()
  181. newWorker, err := s.adapter.CreateWorker(_worker)
  182. if err != nil {
  183. err := fmt.Errorf("failed to register worker: %s",
  184. err.Error(),
  185. )
  186. c.Error(err)
  187. s.returnErrJSON(c, http.StatusInternalServerError, err)
  188. return
  189. }
  190. logger.Noticef("Worker <%s> registered", _worker.ID)
  191. // create workerCmd channel for this worker
  192. c.JSON(http.StatusOK, newWorker)
  193. }
  194. // listJobsOfWorker respond with all the jobs of the specified worker
  195. func (s *Manager) listJobsOfWorker(c *gin.Context) {
  196. workerID := c.Param("id")
  197. mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
  198. if err != nil {
  199. err := fmt.Errorf("failed to list jobs of worker %s: %s",
  200. workerID, err.Error(),
  201. )
  202. c.Error(err)
  203. s.returnErrJSON(c, http.StatusInternalServerError, err)
  204. return
  205. }
  206. c.JSON(http.StatusOK, mirrorStatusList)
  207. }
  208. func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) {
  209. c.JSON(code, gin.H{
  210. _errorKey: err.Error(),
  211. })
  212. }
  213. func (s *Manager) updateJobOfWorker(c *gin.Context) {
  214. workerID := c.Param("id")
  215. var status MirrorStatus
  216. c.BindJSON(&status)
  217. mirrorName := status.Name
  218. if len(mirrorName) == 0 {
  219. s.returnErrJSON(
  220. c, http.StatusBadRequest,
  221. errors.New("Mirror Name should not be empty"),
  222. )
  223. }
  224. curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
  225. // Only successful syncing needs last_update
  226. if status.Status == Success {
  227. status.LastUpdate = time.Now()
  228. } else {
  229. status.LastUpdate = curStatus.LastUpdate
  230. }
  231. if status.Status == Success || status.Status == Failed {
  232. status.LastEnded = time.Now()
  233. } else {
  234. status.LastEnded = curStatus.LastEnded
  235. }
  236. // Only message with meaningful size updates the mirror size
  237. if len(curStatus.Size) > 0 && curStatus.Size != "unknown" {
  238. if len(status.Size) == 0 || status.Size == "unknown" {
  239. status.Size = curStatus.Size
  240. }
  241. }
  242. // for logging
  243. switch status.Status {
  244. case Syncing:
  245. logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
  246. default:
  247. logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
  248. }
  249. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  250. if err != nil {
  251. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  252. mirrorName, workerID, err.Error(),
  253. )
  254. c.Error(err)
  255. s.returnErrJSON(c, http.StatusInternalServerError, err)
  256. return
  257. }
  258. c.JSON(http.StatusOK, newStatus)
  259. }
  260. func (s *Manager) updateMirrorSize(c *gin.Context) {
  261. workerID := c.Param("id")
  262. type SizeMsg struct {
  263. Name string `json:"name"`
  264. Size string `json:"size"`
  265. }
  266. var msg SizeMsg
  267. c.BindJSON(&msg)
  268. mirrorName := msg.Name
  269. status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
  270. if err != nil {
  271. logger.Errorf(
  272. "Failed to get status of mirror %s @<%s>: %s",
  273. mirrorName, workerID, err.Error(),
  274. )
  275. s.returnErrJSON(c, http.StatusInternalServerError, err)
  276. return
  277. }
  278. // Only message with meaningful size updates the mirror size
  279. if len(msg.Size) > 0 || msg.Size != "unknown" {
  280. status.Size = msg.Size
  281. }
  282. logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
  283. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  284. if err != nil {
  285. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  286. mirrorName, workerID, err.Error(),
  287. )
  288. c.Error(err)
  289. s.returnErrJSON(c, http.StatusInternalServerError, err)
  290. return
  291. }
  292. c.JSON(http.StatusOK, newStatus)
  293. }
  294. func (s *Manager) handleClientCmd(c *gin.Context) {
  295. var clientCmd ClientCmd
  296. c.BindJSON(&clientCmd)
  297. workerID := clientCmd.WorkerID
  298. if workerID == "" {
  299. // TODO: decide which worker should do this mirror when WorkerID is null string
  300. logger.Errorf("handleClientCmd case workerID == \" \" not implemented yet")
  301. c.AbortWithStatus(http.StatusInternalServerError)
  302. return
  303. }
  304. w, err := s.adapter.GetWorker(workerID)
  305. if err != nil {
  306. err := fmt.Errorf("worker %s is not registered yet", workerID)
  307. s.returnErrJSON(c, http.StatusBadRequest, err)
  308. return
  309. }
  310. workerURL := w.URL
  311. // parse client cmd into worker cmd
  312. workerCmd := WorkerCmd{
  313. Cmd: clientCmd.Cmd,
  314. MirrorID: clientCmd.MirrorID,
  315. Args: clientCmd.Args,
  316. Options: clientCmd.Options,
  317. }
  318. // update job status, even if the job did not disable successfully,
  319. // this status should be set as disabled
  320. curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
  321. changed := false
  322. switch clientCmd.Cmd {
  323. case CmdDisable:
  324. curStat.Status = Disabled
  325. changed = true
  326. case CmdStop:
  327. curStat.Status = Paused
  328. changed = true
  329. }
  330. if changed {
  331. s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
  332. }
  333. logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)
  334. // post command to worker
  335. _, err = PostJSON(workerURL, workerCmd, s.httpClient)
  336. if err != nil {
  337. err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error())
  338. c.Error(err)
  339. s.returnErrJSON(c, http.StatusInternalServerError, err)
  340. return
  341. }
  342. // TODO: check response for success
  343. c.JSON(http.StatusOK, gin.H{_infoKey: "successfully send command to worker " + workerID})
  344. }