2
0

server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. package manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "github.com/gin-gonic/gin"
  8. . "github.com/tuna/tunasync/internal"
  9. )
  10. const (
  11. _errorKey = "error"
  12. _infoKey = "message"
  13. )
  14. var manager *Manager
  15. // A Manager represents a manager server
  16. type Manager struct {
  17. cfg *Config
  18. engine *gin.Engine
  19. adapter dbAdapter
  20. httpClient *http.Client
  21. }
  22. // GetTUNASyncManager returns the manager from config
  23. func GetTUNASyncManager(cfg *Config) *Manager {
  24. if manager != nil {
  25. return manager
  26. }
  27. // create gin engine
  28. if !cfg.Debug {
  29. gin.SetMode(gin.ReleaseMode)
  30. }
  31. s := &Manager{
  32. cfg: cfg,
  33. adapter: nil,
  34. }
  35. s.engine = gin.New()
  36. s.engine.Use(gin.Recovery())
  37. if cfg.Debug {
  38. s.engine.Use(gin.Logger())
  39. }
  40. if cfg.Files.CACert != "" {
  41. httpClient, err := CreateHTTPClient(cfg.Files.CACert)
  42. if err != nil {
  43. logger.Errorf("Error initializing HTTP client: %s", err.Error())
  44. return nil
  45. }
  46. s.httpClient = httpClient
  47. }
  48. if cfg.Files.DBFile != "" {
  49. adapter, err := makeDBAdapter(cfg.Files.DBType, cfg.Files.DBFile)
  50. if err != nil {
  51. logger.Errorf("Error initializing DB adapter: %s", err.Error())
  52. return nil
  53. }
  54. s.setDBAdapter(adapter)
  55. }
  56. // common log middleware
  57. s.engine.Use(contextErrorLogger)
  58. s.engine.GET("/ping", func(c *gin.Context) {
  59. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  60. })
  61. // list jobs, status page
  62. s.engine.GET("/jobs", s.listAllJobs)
  63. // flush disabled jobs
  64. s.engine.DELETE("/jobs/disabled", s.flushDisabledJobs)
  65. // list workers
  66. s.engine.GET("/workers", s.listWorkers)
  67. // worker online
  68. s.engine.POST("/workers", s.registerWorker)
  69. // workerID should be valid in this route group
  70. workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
  71. {
  72. // delete specified worker
  73. workerValidateGroup.DELETE(":id", s.deleteWorker)
  74. // get job list
  75. workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
  76. // post job status
  77. workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
  78. workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize)
  79. workerValidateGroup.POST(":id/schedules", s.updateSchedulesOfWorker)
  80. }
  81. // for tunasynctl to post commands
  82. s.engine.POST("/cmd", s.handleClientCmd)
  83. manager = s
  84. return s
  85. }
  86. func (s *Manager) setDBAdapter(adapter dbAdapter) {
  87. s.adapter = adapter
  88. }
  89. // Run runs the manager server forever
  90. func (s *Manager) Run() {
  91. addr := fmt.Sprintf("%s:%d", s.cfg.Server.Addr, s.cfg.Server.Port)
  92. httpServer := &http.Server{
  93. Addr: addr,
  94. Handler: s.engine,
  95. ReadTimeout: 10 * time.Second,
  96. WriteTimeout: 10 * time.Second,
  97. }
  98. if s.cfg.Server.SSLCert == "" && s.cfg.Server.SSLKey == "" {
  99. if err := httpServer.ListenAndServe(); err != nil {
  100. panic(err)
  101. }
  102. } else {
  103. if err := httpServer.ListenAndServeTLS(s.cfg.Server.SSLCert, s.cfg.Server.SSLKey); err != nil {
  104. panic(err)
  105. }
  106. }
  107. }
  108. // listAllJobs repond with all jobs of specified workers
  109. func (s *Manager) listAllJobs(c *gin.Context) {
  110. mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
  111. if err != nil {
  112. err := fmt.Errorf("failed to list all mirror status: %s",
  113. err.Error(),
  114. )
  115. c.Error(err)
  116. s.returnErrJSON(c, http.StatusInternalServerError, err)
  117. return
  118. }
  119. webMirStatusList := []WebMirrorStatus{}
  120. for _, m := range mirrorStatusList {
  121. webMirStatusList = append(
  122. webMirStatusList,
  123. BuildWebMirrorStatus(m),
  124. )
  125. }
  126. c.JSON(http.StatusOK, webMirStatusList)
  127. }
  128. // flushDisabledJobs deletes all jobs that marks as deleted
  129. func (s *Manager) flushDisabledJobs(c *gin.Context) {
  130. err := s.adapter.FlushDisabledJobs()
  131. if err != nil {
  132. err := fmt.Errorf("failed to flush disabled jobs: %s",
  133. err.Error(),
  134. )
  135. c.Error(err)
  136. s.returnErrJSON(c, http.StatusInternalServerError, err)
  137. return
  138. }
  139. c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
  140. }
  141. // deleteWorker deletes one worker by id
  142. func (s *Manager) deleteWorker(c *gin.Context) {
  143. workerID := c.Param("id")
  144. err := s.adapter.DeleteWorker(workerID)
  145. if err != nil {
  146. err := fmt.Errorf("failed to delete worker: %s",
  147. err.Error(),
  148. )
  149. c.Error(err)
  150. s.returnErrJSON(c, http.StatusInternalServerError, err)
  151. return
  152. }
  153. logger.Noticef("Worker <%s> deleted", workerID)
  154. c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
  155. }
  156. // listWrokers respond with informations of all the workers
  157. func (s *Manager) listWorkers(c *gin.Context) {
  158. var workerInfos []WorkerStatus
  159. workers, err := s.adapter.ListWorkers()
  160. if err != nil {
  161. err := fmt.Errorf("failed to list workers: %s",
  162. err.Error(),
  163. )
  164. c.Error(err)
  165. s.returnErrJSON(c, http.StatusInternalServerError, err)
  166. return
  167. }
  168. for _, w := range workers {
  169. workerInfos = append(workerInfos,
  170. WorkerStatus{
  171. ID: w.ID,
  172. LastOnline: w.LastOnline,
  173. })
  174. }
  175. c.JSON(http.StatusOK, workerInfos)
  176. }
  177. // registerWorker register an newly-online worker
  178. func (s *Manager) registerWorker(c *gin.Context) {
  179. var _worker WorkerStatus
  180. c.BindJSON(&_worker)
  181. _worker.LastOnline = time.Now()
  182. newWorker, err := s.adapter.CreateWorker(_worker)
  183. if err != nil {
  184. err := fmt.Errorf("failed to register worker: %s",
  185. err.Error(),
  186. )
  187. c.Error(err)
  188. s.returnErrJSON(c, http.StatusInternalServerError, err)
  189. return
  190. }
  191. logger.Noticef("Worker <%s> registered", _worker.ID)
  192. // create workerCmd channel for this worker
  193. c.JSON(http.StatusOK, newWorker)
  194. }
  195. // listJobsOfWorker respond with all the jobs of the specified worker
  196. func (s *Manager) listJobsOfWorker(c *gin.Context) {
  197. workerID := c.Param("id")
  198. mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
  199. if err != nil {
  200. err := fmt.Errorf("failed to list jobs of worker %s: %s",
  201. workerID, err.Error(),
  202. )
  203. c.Error(err)
  204. s.returnErrJSON(c, http.StatusInternalServerError, err)
  205. return
  206. }
  207. c.JSON(http.StatusOK, mirrorStatusList)
  208. }
  209. func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) {
  210. c.JSON(code, gin.H{
  211. _errorKey: err.Error(),
  212. })
  213. }
  214. func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
  215. workerID := c.Param("id")
  216. var schedules MirrorSchedules
  217. c.BindJSON(&schedules)
  218. for _, schedule := range schedules.Schedules {
  219. mirrorName := schedule.MirrorName
  220. if len(mirrorName) == 0 {
  221. s.returnErrJSON(
  222. c, http.StatusBadRequest,
  223. errors.New("Mirror Name should not be empty"),
  224. )
  225. }
  226. curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
  227. if err != nil {
  228. fmt.Errorf("failed to get job %s of worker %s: %s",
  229. mirrorName, workerID, err.Error(),
  230. )
  231. continue
  232. }
  233. if curStatus.Scheduled == schedule.NextSchedule {
  234. // no changes, skip update
  235. continue
  236. }
  237. curStatus.Scheduled = schedule.NextSchedule
  238. _, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
  239. if err != nil {
  240. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  241. mirrorName, workerID, err.Error(),
  242. )
  243. c.Error(err)
  244. s.returnErrJSON(c, http.StatusInternalServerError, err)
  245. return
  246. }
  247. }
  248. type empty struct{}
  249. c.JSON(http.StatusOK, empty{})
  250. }
  251. func (s *Manager) updateJobOfWorker(c *gin.Context) {
  252. workerID := c.Param("id")
  253. var status MirrorStatus
  254. c.BindJSON(&status)
  255. mirrorName := status.Name
  256. if len(mirrorName) == 0 {
  257. s.returnErrJSON(
  258. c, http.StatusBadRequest,
  259. errors.New("Mirror Name should not be empty"),
  260. )
  261. }
  262. curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
  263. // Only successful syncing needs last_update
  264. if status.Status == Success {
  265. status.LastUpdate = time.Now()
  266. } else {
  267. status.LastUpdate = curStatus.LastUpdate
  268. }
  269. if status.Status == Success || status.Status == Failed {
  270. status.LastEnded = time.Now()
  271. } else {
  272. status.LastEnded = curStatus.LastEnded
  273. }
  274. // Only message with meaningful size updates the mirror size
  275. if len(curStatus.Size) > 0 && curStatus.Size != "unknown" {
  276. if len(status.Size) == 0 || status.Size == "unknown" {
  277. status.Size = curStatus.Size
  278. }
  279. }
  280. // for logging
  281. switch status.Status {
  282. case Syncing:
  283. logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
  284. default:
  285. logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
  286. }
  287. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  288. if err != nil {
  289. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  290. mirrorName, workerID, err.Error(),
  291. )
  292. c.Error(err)
  293. s.returnErrJSON(c, http.StatusInternalServerError, err)
  294. return
  295. }
  296. c.JSON(http.StatusOK, newStatus)
  297. }
  298. func (s *Manager) updateMirrorSize(c *gin.Context) {
  299. workerID := c.Param("id")
  300. type SizeMsg struct {
  301. Name string `json:"name"`
  302. Size string `json:"size"`
  303. }
  304. var msg SizeMsg
  305. c.BindJSON(&msg)
  306. mirrorName := msg.Name
  307. status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
  308. if err != nil {
  309. logger.Errorf(
  310. "Failed to get status of mirror %s @<%s>: %s",
  311. mirrorName, workerID, err.Error(),
  312. )
  313. s.returnErrJSON(c, http.StatusInternalServerError, err)
  314. return
  315. }
  316. // Only message with meaningful size updates the mirror size
  317. if len(msg.Size) > 0 || msg.Size != "unknown" {
  318. status.Size = msg.Size
  319. }
  320. logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
  321. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  322. if err != nil {
  323. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  324. mirrorName, workerID, err.Error(),
  325. )
  326. c.Error(err)
  327. s.returnErrJSON(c, http.StatusInternalServerError, err)
  328. return
  329. }
  330. c.JSON(http.StatusOK, newStatus)
  331. }
  332. func (s *Manager) handleClientCmd(c *gin.Context) {
  333. var clientCmd ClientCmd
  334. c.BindJSON(&clientCmd)
  335. workerID := clientCmd.WorkerID
  336. if workerID == "" {
  337. // TODO: decide which worker should do this mirror when WorkerID is null string
  338. logger.Errorf("handleClientCmd case workerID == \" \" not implemented yet")
  339. c.AbortWithStatus(http.StatusInternalServerError)
  340. return
  341. }
  342. w, err := s.adapter.GetWorker(workerID)
  343. if err != nil {
  344. err := fmt.Errorf("worker %s is not registered yet", workerID)
  345. s.returnErrJSON(c, http.StatusBadRequest, err)
  346. return
  347. }
  348. workerURL := w.URL
  349. // parse client cmd into worker cmd
  350. workerCmd := WorkerCmd{
  351. Cmd: clientCmd.Cmd,
  352. MirrorID: clientCmd.MirrorID,
  353. Args: clientCmd.Args,
  354. Options: clientCmd.Options,
  355. }
  356. // update job status, even if the job did not disable successfully,
  357. // this status should be set as disabled
  358. curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
  359. changed := false
  360. switch clientCmd.Cmd {
  361. case CmdDisable:
  362. curStat.Status = Disabled
  363. changed = true
  364. case CmdStop:
  365. curStat.Status = Paused
  366. changed = true
  367. }
  368. if changed {
  369. s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
  370. }
  371. logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)
  372. // post command to worker
  373. _, err = PostJSON(workerURL, workerCmd, s.httpClient)
  374. if err != nil {
  375. err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error())
  376. c.Error(err)
  377. s.returnErrJSON(c, http.StatusInternalServerError, err)
  378. return
  379. }
  380. // TODO: check response for success
  381. c.JSON(http.StatusOK, gin.H{_infoKey: "successfully send command to worker " + workerID})
  382. }