server.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. package manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "github.com/gin-gonic/gin"
  8. . "github.com/tuna/tunasync/internal"
  9. )
  10. const (
  11. _errorKey = "error"
  12. _infoKey = "message"
  13. )
  14. var manager *Manager
  15. // A Manager represents a manager server
  16. type Manager struct {
  17. cfg *Config
  18. engine *gin.Engine
  19. adapter dbAdapter
  20. httpClient *http.Client
  21. }
  22. // GetTUNASyncManager returns the manager from config
  23. func GetTUNASyncManager(cfg *Config) *Manager {
  24. if manager != nil {
  25. return manager
  26. }
  27. // create gin engine
  28. if !cfg.Debug {
  29. gin.SetMode(gin.ReleaseMode)
  30. }
  31. s := &Manager{
  32. cfg: cfg,
  33. adapter: nil,
  34. }
  35. s.engine = gin.New()
  36. s.engine.Use(gin.Recovery())
  37. if cfg.Debug {
  38. s.engine.Use(gin.Logger())
  39. }
  40. if cfg.Files.CACert != "" {
  41. httpClient, err := CreateHTTPClient(cfg.Files.CACert)
  42. if err != nil {
  43. logger.Errorf("Error initializing HTTP client: %s", err.Error())
  44. return nil
  45. }
  46. s.httpClient = httpClient
  47. }
  48. if cfg.Files.DBFile != "" {
  49. adapter, err := makeDBAdapter(cfg.Files.DBType, cfg.Files.DBFile)
  50. if err != nil {
  51. logger.Errorf("Error initializing DB adapter: %s", err.Error())
  52. return nil
  53. }
  54. s.setDBAdapter(adapter)
  55. }
  56. // common log middleware
  57. s.engine.Use(contextErrorLogger)
  58. s.engine.GET("/ping", func(c *gin.Context) {
  59. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  60. })
  61. // list jobs, status page
  62. s.engine.GET("/jobs", s.listAllJobs)
  63. // flush disabled jobs
  64. s.engine.DELETE("/jobs/disabled", s.flushDisabledJobs)
  65. // list workers
  66. s.engine.GET("/workers", s.listWorkers)
  67. // worker online
  68. s.engine.POST("/workers", s.registerWorker)
  69. // workerID should be valid in this route group
  70. workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
  71. {
  72. // get job list
  73. workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
  74. // post job status
  75. workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
  76. workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize)
  77. }
  78. // for tunasynctl to post commands
  79. s.engine.POST("/cmd", s.handleClientCmd)
  80. manager = s
  81. return s
  82. }
  83. func (s *Manager) setDBAdapter(adapter dbAdapter) {
  84. s.adapter = adapter
  85. }
  86. // Run runs the manager server forever
  87. func (s *Manager) Run() {
  88. addr := fmt.Sprintf("%s:%d", s.cfg.Server.Addr, s.cfg.Server.Port)
  89. httpServer := &http.Server{
  90. Addr: addr,
  91. Handler: s.engine,
  92. ReadTimeout: 10 * time.Second,
  93. WriteTimeout: 10 * time.Second,
  94. }
  95. if s.cfg.Server.SSLCert == "" && s.cfg.Server.SSLKey == "" {
  96. if err := httpServer.ListenAndServe(); err != nil {
  97. panic(err)
  98. }
  99. } else {
  100. if err := httpServer.ListenAndServeTLS(s.cfg.Server.SSLCert, s.cfg.Server.SSLKey); err != nil {
  101. panic(err)
  102. }
  103. }
  104. }
  105. // listAllJobs repond with all jobs of specified workers
  106. func (s *Manager) listAllJobs(c *gin.Context) {
  107. mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
  108. if err != nil {
  109. err := fmt.Errorf("failed to list all mirror status: %s",
  110. err.Error(),
  111. )
  112. c.Error(err)
  113. s.returnErrJSON(c, http.StatusInternalServerError, err)
  114. return
  115. }
  116. webMirStatusList := []WebMirrorStatus{}
  117. for _, m := range mirrorStatusList {
  118. webMirStatusList = append(
  119. webMirStatusList,
  120. BuildWebMirrorStatus(m),
  121. )
  122. }
  123. c.JSON(http.StatusOK, webMirStatusList)
  124. }
  125. // flushDisabledJobs deletes all jobs that marks as deleted
  126. func (s *Manager) flushDisabledJobs(c *gin.Context) {
  127. err := s.adapter.FlushDisabledJobs()
  128. if err != nil {
  129. err := fmt.Errorf("failed to flush disabled jobs: %s",
  130. err.Error(),
  131. )
  132. c.Error(err)
  133. s.returnErrJSON(c, http.StatusInternalServerError, err)
  134. return
  135. }
  136. c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
  137. }
  138. // listWrokers respond with informations of all the workers
  139. func (s *Manager) listWorkers(c *gin.Context) {
  140. var workerInfos []WorkerStatus
  141. workers, err := s.adapter.ListWorkers()
  142. if err != nil {
  143. err := fmt.Errorf("failed to list workers: %s",
  144. err.Error(),
  145. )
  146. c.Error(err)
  147. s.returnErrJSON(c, http.StatusInternalServerError, err)
  148. return
  149. }
  150. for _, w := range workers {
  151. workerInfos = append(workerInfos,
  152. WorkerStatus{
  153. ID: w.ID,
  154. LastOnline: w.LastOnline,
  155. })
  156. }
  157. c.JSON(http.StatusOK, workerInfos)
  158. }
  159. // registerWorker register an newly-online worker
  160. func (s *Manager) registerWorker(c *gin.Context) {
  161. var _worker WorkerStatus
  162. c.BindJSON(&_worker)
  163. _worker.LastOnline = time.Now()
  164. newWorker, err := s.adapter.CreateWorker(_worker)
  165. if err != nil {
  166. err := fmt.Errorf("failed to register worker: %s",
  167. err.Error(),
  168. )
  169. c.Error(err)
  170. s.returnErrJSON(c, http.StatusInternalServerError, err)
  171. return
  172. }
  173. logger.Noticef("Worker <%s> registered", _worker.ID)
  174. // create workerCmd channel for this worker
  175. c.JSON(http.StatusOK, newWorker)
  176. }
  177. // listJobsOfWorker respond with all the jobs of the specified worker
  178. func (s *Manager) listJobsOfWorker(c *gin.Context) {
  179. workerID := c.Param("id")
  180. mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
  181. if err != nil {
  182. err := fmt.Errorf("failed to list jobs of worker %s: %s",
  183. workerID, err.Error(),
  184. )
  185. c.Error(err)
  186. s.returnErrJSON(c, http.StatusInternalServerError, err)
  187. return
  188. }
  189. c.JSON(http.StatusOK, mirrorStatusList)
  190. }
  191. func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) {
  192. c.JSON(code, gin.H{
  193. _errorKey: err.Error(),
  194. })
  195. }
  196. func (s *Manager) updateJobOfWorker(c *gin.Context) {
  197. workerID := c.Param("id")
  198. var status MirrorStatus
  199. c.BindJSON(&status)
  200. mirrorName := status.Name
  201. if len(mirrorName) == 0 {
  202. s.returnErrJSON(
  203. c, http.StatusBadRequest,
  204. errors.New("Mirror Name should not be empty"),
  205. )
  206. }
  207. curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
  208. // Only successful syncing needs last_update
  209. if status.Status == Success {
  210. status.LastUpdate = time.Now()
  211. } else {
  212. status.LastUpdate = curStatus.LastUpdate
  213. }
  214. if status.Status == Success || status.Status == Failed {
  215. status.LastEnded = time.Now()
  216. } else {
  217. status.LastEnded = curStatus.LastEnded
  218. }
  219. // Only message with meaningful size updates the mirror size
  220. if len(curStatus.Size) > 0 && curStatus.Size != "unknown" {
  221. if len(status.Size) == 0 || status.Size == "unknown" {
  222. status.Size = curStatus.Size
  223. }
  224. }
  225. // for logging
  226. switch status.Status {
  227. case Syncing:
  228. logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
  229. default:
  230. logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
  231. }
  232. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  233. if err != nil {
  234. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  235. mirrorName, workerID, err.Error(),
  236. )
  237. c.Error(err)
  238. s.returnErrJSON(c, http.StatusInternalServerError, err)
  239. return
  240. }
  241. c.JSON(http.StatusOK, newStatus)
  242. }
  243. func (s *Manager) updateMirrorSize(c *gin.Context) {
  244. workerID := c.Param("id")
  245. type SizeMsg struct {
  246. Name string `json:"name"`
  247. Size string `json:"size"`
  248. }
  249. var msg SizeMsg
  250. c.BindJSON(&msg)
  251. mirrorName := msg.Name
  252. status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
  253. if err != nil {
  254. logger.Errorf(
  255. "Failed to get status of mirror %s @<%s>: %s",
  256. mirrorName, workerID, err.Error(),
  257. )
  258. s.returnErrJSON(c, http.StatusInternalServerError, err)
  259. return
  260. }
  261. // Only message with meaningful size updates the mirror size
  262. if len(msg.Size) > 0 || msg.Size != "unknown" {
  263. status.Size = msg.Size
  264. }
  265. logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
  266. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  267. if err != nil {
  268. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  269. mirrorName, workerID, err.Error(),
  270. )
  271. c.Error(err)
  272. s.returnErrJSON(c, http.StatusInternalServerError, err)
  273. return
  274. }
  275. c.JSON(http.StatusOK, newStatus)
  276. }
  277. func (s *Manager) handleClientCmd(c *gin.Context) {
  278. var clientCmd ClientCmd
  279. c.BindJSON(&clientCmd)
  280. workerID := clientCmd.WorkerID
  281. if workerID == "" {
  282. // TODO: decide which worker should do this mirror when WorkerID is null string
  283. logger.Errorf("handleClientCmd case workerID == \" \" not implemented yet")
  284. c.AbortWithStatus(http.StatusInternalServerError)
  285. return
  286. }
  287. w, err := s.adapter.GetWorker(workerID)
  288. if err != nil {
  289. err := fmt.Errorf("worker %s is not registered yet", workerID)
  290. s.returnErrJSON(c, http.StatusBadRequest, err)
  291. return
  292. }
  293. workerURL := w.URL
  294. // parse client cmd into worker cmd
  295. workerCmd := WorkerCmd{
  296. Cmd: clientCmd.Cmd,
  297. MirrorID: clientCmd.MirrorID,
  298. Args: clientCmd.Args,
  299. }
  300. // update job status, even if the job did not disable successfully,
  301. // this status should be set as disabled
  302. curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
  303. changed := false
  304. switch clientCmd.Cmd {
  305. case CmdDisable:
  306. curStat.Status = Disabled
  307. changed = true
  308. case CmdStop:
  309. curStat.Status = Paused
  310. changed = true
  311. }
  312. if changed {
  313. s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
  314. }
  315. logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)
  316. // post command to worker
  317. _, err = PostJSON(workerURL, workerCmd, s.httpClient)
  318. if err != nil {
  319. err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error())
  320. c.Error(err)
  321. s.returnErrJSON(c, http.StatusInternalServerError, err)
  322. return
  323. }
  324. // TODO: check response for success
  325. c.JSON(http.StatusOK, gin.H{_infoKey: "successfully send command to worker " + workerID})
  326. }