server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. package manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/gin-gonic/gin"
  9. . "github.com/tuna/tunasync/internal"
  10. )
  11. const (
  12. _errorKey = "error"
  13. _infoKey = "message"
  14. )
  15. var manager *Manager
  16. // A Manager represents a manager server
  17. type Manager struct {
  18. cfg *Config
  19. engine *gin.Engine
  20. adapter dbAdapter
  21. rwmu sync.RWMutex
  22. httpClient *http.Client
  23. }
  24. // GetTUNASyncManager returns the manager from config
  25. func GetTUNASyncManager(cfg *Config) *Manager {
  26. if manager != nil {
  27. return manager
  28. }
  29. // create gin engine
  30. if !cfg.Debug {
  31. gin.SetMode(gin.ReleaseMode)
  32. }
  33. s := &Manager{
  34. cfg: cfg,
  35. adapter: nil,
  36. }
  37. s.engine = gin.New()
  38. s.engine.Use(gin.Recovery())
  39. if cfg.Debug {
  40. s.engine.Use(gin.Logger())
  41. }
  42. if cfg.Files.CACert != "" {
  43. httpClient, err := CreateHTTPClient(cfg.Files.CACert)
  44. if err != nil {
  45. logger.Errorf("Error initializing HTTP client: %s", err.Error())
  46. return nil
  47. }
  48. s.httpClient = httpClient
  49. }
  50. if cfg.Files.DBFile != "" {
  51. adapter, err := makeDBAdapter(cfg.Files.DBType, cfg.Files.DBFile)
  52. if err != nil {
  53. logger.Errorf("Error initializing DB adapter: %s", err.Error())
  54. return nil
  55. }
  56. s.setDBAdapter(adapter)
  57. }
  58. // common log middleware
  59. s.engine.Use(contextErrorLogger)
  60. s.engine.GET("/ping", func(c *gin.Context) {
  61. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  62. })
  63. // list jobs, status page
  64. s.engine.GET("/jobs", s.listAllJobs)
  65. // flush disabled jobs
  66. s.engine.DELETE("/jobs/disabled", s.flushDisabledJobs)
  67. // list workers
  68. s.engine.GET("/workers", s.listWorkers)
  69. // worker online
  70. s.engine.POST("/workers", s.registerWorker)
  71. // workerID should be valid in this route group
  72. workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
  73. {
  74. // delete specified worker
  75. workerValidateGroup.DELETE(":id", s.deleteWorker)
  76. // get job list
  77. workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
  78. // post job status
  79. workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
  80. workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize)
  81. workerValidateGroup.POST(":id/schedules", s.updateSchedulesOfWorker)
  82. }
  83. // for tunasynctl to post commands
  84. s.engine.POST("/cmd", s.handleClientCmd)
  85. manager = s
  86. return s
  87. }
  88. func (s *Manager) setDBAdapter(adapter dbAdapter) {
  89. s.adapter = adapter
  90. }
  91. // Run runs the manager server forever
  92. func (s *Manager) Run() {
  93. addr := fmt.Sprintf("%s:%d", s.cfg.Server.Addr, s.cfg.Server.Port)
  94. httpServer := &http.Server{
  95. Addr: addr,
  96. Handler: s.engine,
  97. ReadTimeout: 10 * time.Second,
  98. WriteTimeout: 10 * time.Second,
  99. }
  100. if s.cfg.Server.SSLCert == "" && s.cfg.Server.SSLKey == "" {
  101. if err := httpServer.ListenAndServe(); err != nil {
  102. panic(err)
  103. }
  104. } else {
  105. if err := httpServer.ListenAndServeTLS(s.cfg.Server.SSLCert, s.cfg.Server.SSLKey); err != nil {
  106. panic(err)
  107. }
  108. }
  109. }
  110. // listAllJobs respond with all jobs of specified workers
  111. func (s *Manager) listAllJobs(c *gin.Context) {
  112. s.rwmu.RLock()
  113. mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
  114. s.rwmu.RUnlock()
  115. if err != nil {
  116. err := fmt.Errorf("failed to list all mirror status: %s",
  117. err.Error(),
  118. )
  119. c.Error(err)
  120. s.returnErrJSON(c, http.StatusInternalServerError, err)
  121. return
  122. }
  123. webMirStatusList := []WebMirrorStatus{}
  124. for _, m := range mirrorStatusList {
  125. webMirStatusList = append(
  126. webMirStatusList,
  127. BuildWebMirrorStatus(m),
  128. )
  129. }
  130. c.JSON(http.StatusOK, webMirStatusList)
  131. }
  132. // flushDisabledJobs deletes all jobs that marks as deleted
  133. func (s *Manager) flushDisabledJobs(c *gin.Context) {
  134. s.rwmu.Lock()
  135. err := s.adapter.FlushDisabledJobs()
  136. s.rwmu.Unlock()
  137. if err != nil {
  138. err := fmt.Errorf("failed to flush disabled jobs: %s",
  139. err.Error(),
  140. )
  141. c.Error(err)
  142. s.returnErrJSON(c, http.StatusInternalServerError, err)
  143. return
  144. }
  145. c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
  146. }
  147. // deleteWorker deletes one worker by id
  148. func (s *Manager) deleteWorker(c *gin.Context) {
  149. workerID := c.Param("id")
  150. s.rwmu.Lock()
  151. err := s.adapter.DeleteWorker(workerID)
  152. s.rwmu.Unlock()
  153. if err != nil {
  154. err := fmt.Errorf("failed to delete worker: %s",
  155. err.Error(),
  156. )
  157. c.Error(err)
  158. s.returnErrJSON(c, http.StatusInternalServerError, err)
  159. return
  160. }
  161. logger.Noticef("Worker <%s> deleted", workerID)
  162. c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
  163. }
  164. // listWorkers respond with information of all the workers
  165. func (s *Manager) listWorkers(c *gin.Context) {
  166. var workerInfos []WorkerStatus
  167. s.rwmu.RLock()
  168. workers, err := s.adapter.ListWorkers()
  169. s.rwmu.RUnlock()
  170. if err != nil {
  171. err := fmt.Errorf("failed to list workers: %s",
  172. err.Error(),
  173. )
  174. c.Error(err)
  175. s.returnErrJSON(c, http.StatusInternalServerError, err)
  176. return
  177. }
  178. for _, w := range workers {
  179. workerInfos = append(workerInfos,
  180. WorkerStatus{
  181. ID: w.ID,
  182. LastOnline: w.LastOnline,
  183. LastRegister: w.LastRegister,
  184. })
  185. }
  186. c.JSON(http.StatusOK, workerInfos)
  187. }
  188. // registerWorker register an newly-online worker
  189. func (s *Manager) registerWorker(c *gin.Context) {
  190. var _worker WorkerStatus
  191. c.BindJSON(&_worker)
  192. _worker.LastOnline = time.Now()
  193. _worker.LastRegister = time.Now()
  194. newWorker, err := s.adapter.CreateWorker(_worker)
  195. if err != nil {
  196. err := fmt.Errorf("failed to register worker: %s",
  197. err.Error(),
  198. )
  199. c.Error(err)
  200. s.returnErrJSON(c, http.StatusInternalServerError, err)
  201. return
  202. }
  203. logger.Noticef("Worker <%s> registered", _worker.ID)
  204. // create workerCmd channel for this worker
  205. c.JSON(http.StatusOK, newWorker)
  206. }
  207. // listJobsOfWorker respond with all the jobs of the specified worker
  208. func (s *Manager) listJobsOfWorker(c *gin.Context) {
  209. workerID := c.Param("id")
  210. s.rwmu.RLock()
  211. mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
  212. s.rwmu.RUnlock()
  213. if err != nil {
  214. err := fmt.Errorf("failed to list jobs of worker %s: %s",
  215. workerID, err.Error(),
  216. )
  217. c.Error(err)
  218. s.returnErrJSON(c, http.StatusInternalServerError, err)
  219. return
  220. }
  221. c.JSON(http.StatusOK, mirrorStatusList)
  222. }
  223. func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) {
  224. c.JSON(code, gin.H{
  225. _errorKey: err.Error(),
  226. })
  227. }
  228. func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
  229. workerID := c.Param("id")
  230. var schedules MirrorSchedules
  231. c.BindJSON(&schedules)
  232. for _, schedule := range schedules.Schedules {
  233. mirrorName := schedule.MirrorName
  234. if len(mirrorName) == 0 {
  235. s.returnErrJSON(
  236. c, http.StatusBadRequest,
  237. errors.New("Mirror Name should not be empty"),
  238. )
  239. }
  240. s.rwmu.RLock()
  241. s.adapter.RefreshWorker(workerID)
  242. curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
  243. s.rwmu.RUnlock()
  244. if err != nil {
  245. fmt.Errorf("failed to get job %s of worker %s: %s",
  246. mirrorName, workerID, err.Error(),
  247. )
  248. continue
  249. }
  250. if curStatus.Scheduled == schedule.NextSchedule {
  251. // no changes, skip update
  252. continue
  253. }
  254. curStatus.Scheduled = schedule.NextSchedule
  255. s.rwmu.Lock()
  256. _, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
  257. s.rwmu.Unlock()
  258. if err != nil {
  259. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  260. mirrorName, workerID, err.Error(),
  261. )
  262. c.Error(err)
  263. s.returnErrJSON(c, http.StatusInternalServerError, err)
  264. return
  265. }
  266. }
  267. type empty struct{}
  268. c.JSON(http.StatusOK, empty{})
  269. }
  270. func (s *Manager) updateJobOfWorker(c *gin.Context) {
  271. workerID := c.Param("id")
  272. var status MirrorStatus
  273. c.BindJSON(&status)
  274. mirrorName := status.Name
  275. if len(mirrorName) == 0 {
  276. s.returnErrJSON(
  277. c, http.StatusBadRequest,
  278. errors.New("Mirror Name should not be empty"),
  279. )
  280. }
  281. s.rwmu.RLock()
  282. s.adapter.RefreshWorker(workerID)
  283. curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
  284. s.rwmu.RUnlock()
  285. curTime := time.Now()
  286. if status.Status == PreSyncing && curStatus.Status != PreSyncing {
  287. status.LastStarted = curTime
  288. } else {
  289. status.LastStarted = curStatus.LastStarted
  290. }
  291. // Only successful syncing needs last_update
  292. if status.Status == Success {
  293. status.LastUpdate = curTime
  294. } else {
  295. status.LastUpdate = curStatus.LastUpdate
  296. }
  297. if status.Status == Success || status.Status == Failed {
  298. status.LastEnded = curTime
  299. } else {
  300. status.LastEnded = curStatus.LastEnded
  301. }
  302. // Only message with meaningful size updates the mirror size
  303. if len(curStatus.Size) > 0 && curStatus.Size != "unknown" {
  304. if len(status.Size) == 0 || status.Size == "unknown" {
  305. status.Size = curStatus.Size
  306. }
  307. }
  308. // for logging
  309. switch status.Status {
  310. case Syncing:
  311. logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
  312. default:
  313. logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
  314. }
  315. s.rwmu.Lock()
  316. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  317. s.rwmu.Unlock()
  318. if err != nil {
  319. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  320. mirrorName, workerID, err.Error(),
  321. )
  322. c.Error(err)
  323. s.returnErrJSON(c, http.StatusInternalServerError, err)
  324. return
  325. }
  326. c.JSON(http.StatusOK, newStatus)
  327. }
  328. func (s *Manager) updateMirrorSize(c *gin.Context) {
  329. workerID := c.Param("id")
  330. type SizeMsg struct {
  331. Name string `json:"name"`
  332. Size string `json:"size"`
  333. }
  334. var msg SizeMsg
  335. c.BindJSON(&msg)
  336. mirrorName := msg.Name
  337. s.rwmu.RLock()
  338. s.adapter.RefreshWorker(workerID)
  339. status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
  340. s.rwmu.RUnlock()
  341. if err != nil {
  342. logger.Errorf(
  343. "Failed to get status of mirror %s @<%s>: %s",
  344. mirrorName, workerID, err.Error(),
  345. )
  346. s.returnErrJSON(c, http.StatusInternalServerError, err)
  347. return
  348. }
  349. // Only message with meaningful size updates the mirror size
  350. if len(msg.Size) > 0 || msg.Size != "unknown" {
  351. status.Size = msg.Size
  352. }
  353. logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
  354. s.rwmu.Lock()
  355. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  356. s.rwmu.Unlock()
  357. if err != nil {
  358. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  359. mirrorName, workerID, err.Error(),
  360. )
  361. c.Error(err)
  362. s.returnErrJSON(c, http.StatusInternalServerError, err)
  363. return
  364. }
  365. c.JSON(http.StatusOK, newStatus)
  366. }
  367. func (s *Manager) handleClientCmd(c *gin.Context) {
  368. var clientCmd ClientCmd
  369. c.BindJSON(&clientCmd)
  370. workerID := clientCmd.WorkerID
  371. if workerID == "" {
  372. // TODO: decide which worker should do this mirror when WorkerID is null string
  373. logger.Errorf("handleClientCmd case workerID == \" \" not implemented yet")
  374. c.AbortWithStatus(http.StatusInternalServerError)
  375. return
  376. }
  377. s.rwmu.RLock()
  378. w, err := s.adapter.GetWorker(workerID)
  379. s.rwmu.RUnlock()
  380. if err != nil {
  381. err := fmt.Errorf("worker %s is not registered yet", workerID)
  382. s.returnErrJSON(c, http.StatusBadRequest, err)
  383. return
  384. }
  385. workerURL := w.URL
  386. // parse client cmd into worker cmd
  387. workerCmd := WorkerCmd{
  388. Cmd: clientCmd.Cmd,
  389. MirrorID: clientCmd.MirrorID,
  390. Args: clientCmd.Args,
  391. Options: clientCmd.Options,
  392. }
  393. // update job status, even if the job did not disable successfully,
  394. // this status should be set as disabled
  395. s.rwmu.RLock()
  396. curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
  397. s.rwmu.RUnlock()
  398. changed := false
  399. switch clientCmd.Cmd {
  400. case CmdDisable:
  401. curStat.Status = Disabled
  402. changed = true
  403. case CmdStop:
  404. curStat.Status = Paused
  405. changed = true
  406. }
  407. if changed {
  408. s.rwmu.Lock()
  409. s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
  410. s.rwmu.Unlock()
  411. }
  412. logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)
  413. // post command to worker
  414. _, err = PostJSON(workerURL, workerCmd, s.httpClient)
  415. if err != nil {
  416. err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error())
  417. c.Error(err)
  418. s.returnErrJSON(c, http.StatusInternalServerError, err)
  419. return
  420. }
  421. // TODO: check response for success
  422. c.JSON(http.StatusOK, gin.H{_infoKey: "successfully send command to worker " + workerID})
  423. }