server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. package manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/gin-gonic/gin"
  9. . "github.com/tuna/tunasync/internal"
  10. )
  11. const (
  12. _errorKey = "error"
  13. _infoKey = "message"
  14. )
  15. var manager *Manager
  16. // A Manager represents a manager server
  17. type Manager struct {
  18. cfg *Config
  19. engine *gin.Engine
  20. adapter dbAdapter
  21. rwmu sync.RWMutex
  22. httpClient *http.Client
  23. }
  24. // GetTUNASyncManager returns the manager from config
  25. func GetTUNASyncManager(cfg *Config) *Manager {
  26. if manager != nil {
  27. return manager
  28. }
  29. // create gin engine
  30. if !cfg.Debug {
  31. gin.SetMode(gin.ReleaseMode)
  32. }
  33. s := &Manager{
  34. cfg: cfg,
  35. adapter: nil,
  36. }
  37. s.engine = gin.New()
  38. s.engine.Use(gin.Recovery())
  39. if cfg.Debug {
  40. s.engine.Use(gin.Logger())
  41. }
  42. if cfg.Files.CACert != "" {
  43. httpClient, err := CreateHTTPClient(cfg.Files.CACert)
  44. if err != nil {
  45. logger.Errorf("Error initializing HTTP client: %s", err.Error())
  46. return nil
  47. }
  48. s.httpClient = httpClient
  49. }
  50. if cfg.Files.DBFile != "" {
  51. adapter, err := makeDBAdapter(cfg.Files.DBType, cfg.Files.DBFile)
  52. if err != nil {
  53. logger.Errorf("Error initializing DB adapter: %s", err.Error())
  54. return nil
  55. }
  56. s.setDBAdapter(adapter)
  57. }
  58. // common log middleware
  59. s.engine.Use(contextErrorLogger)
  60. s.engine.GET("/ping", func(c *gin.Context) {
  61. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  62. })
  63. // list jobs, status page
  64. s.engine.GET("/jobs", s.listAllJobs)
  65. // flush disabled jobs
  66. s.engine.DELETE("/jobs/disabled", s.flushDisabledJobs)
  67. // list workers
  68. s.engine.GET("/workers", s.listWorkers)
  69. // worker online
  70. s.engine.POST("/workers", s.registerWorker)
  71. // workerID should be valid in this route group
  72. workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
  73. {
  74. // delete specified worker
  75. workerValidateGroup.DELETE(":id", s.deleteWorker)
  76. // get job list
  77. workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
  78. // post job status
  79. workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
  80. workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize)
  81. workerValidateGroup.POST(":id/schedules", s.updateSchedulesOfWorker)
  82. }
  83. // for tunasynctl to post commands
  84. s.engine.POST("/cmd", s.handleClientCmd)
  85. manager = s
  86. return s
  87. }
  88. func (s *Manager) setDBAdapter(adapter dbAdapter) {
  89. s.adapter = adapter
  90. }
  91. // Run runs the manager server forever
  92. func (s *Manager) Run() {
  93. addr := fmt.Sprintf("%s:%d", s.cfg.Server.Addr, s.cfg.Server.Port)
  94. httpServer := &http.Server{
  95. Addr: addr,
  96. Handler: s.engine,
  97. ReadTimeout: 10 * time.Second,
  98. WriteTimeout: 10 * time.Second,
  99. }
  100. if s.cfg.Server.SSLCert == "" && s.cfg.Server.SSLKey == "" {
  101. if err := httpServer.ListenAndServe(); err != nil {
  102. panic(err)
  103. }
  104. } else {
  105. if err := httpServer.ListenAndServeTLS(s.cfg.Server.SSLCert, s.cfg.Server.SSLKey); err != nil {
  106. panic(err)
  107. }
  108. }
  109. }
  110. // listAllJobs respond with all jobs of specified workers
  111. func (s *Manager) listAllJobs(c *gin.Context) {
  112. s.rwmu.RLock()
  113. mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
  114. s.rwmu.RUnlock()
  115. if err != nil {
  116. err := fmt.Errorf("failed to list all mirror status: %s",
  117. err.Error(),
  118. )
  119. c.Error(err)
  120. s.returnErrJSON(c, http.StatusInternalServerError, err)
  121. return
  122. }
  123. webMirStatusList := []WebMirrorStatus{}
  124. for _, m := range mirrorStatusList {
  125. webMirStatusList = append(
  126. webMirStatusList,
  127. BuildWebMirrorStatus(m),
  128. )
  129. }
  130. c.JSON(http.StatusOK, webMirStatusList)
  131. }
  132. // flushDisabledJobs deletes all jobs that marks as deleted
  133. func (s *Manager) flushDisabledJobs(c *gin.Context) {
  134. s.rwmu.Lock()
  135. err := s.adapter.FlushDisabledJobs()
  136. s.rwmu.Unlock()
  137. if err != nil {
  138. err := fmt.Errorf("failed to flush disabled jobs: %s",
  139. err.Error(),
  140. )
  141. c.Error(err)
  142. s.returnErrJSON(c, http.StatusInternalServerError, err)
  143. return
  144. }
  145. c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
  146. }
  147. // deleteWorker deletes one worker by id
  148. func (s *Manager) deleteWorker(c *gin.Context) {
  149. workerID := c.Param("id")
  150. s.rwmu.Lock()
  151. err := s.adapter.DeleteWorker(workerID)
  152. s.rwmu.Unlock()
  153. if err != nil {
  154. err := fmt.Errorf("failed to delete worker: %s",
  155. err.Error(),
  156. )
  157. c.Error(err)
  158. s.returnErrJSON(c, http.StatusInternalServerError, err)
  159. return
  160. }
  161. logger.Noticef("Worker <%s> deleted", workerID)
  162. c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
  163. }
  164. // listWorkers respond with information of all the workers
  165. func (s *Manager) listWorkers(c *gin.Context) {
  166. var workerInfos []WorkerStatus
  167. s.rwmu.RLock()
  168. workers, err := s.adapter.ListWorkers()
  169. s.rwmu.RUnlock()
  170. if err != nil {
  171. err := fmt.Errorf("failed to list workers: %s",
  172. err.Error(),
  173. )
  174. c.Error(err)
  175. s.returnErrJSON(c, http.StatusInternalServerError, err)
  176. return
  177. }
  178. for _, w := range workers {
  179. workerInfos = append(workerInfos,
  180. WorkerStatus{
  181. ID: w.ID,
  182. LastOnline: w.LastOnline,
  183. })
  184. }
  185. c.JSON(http.StatusOK, workerInfos)
  186. }
  187. // registerWorker register an newly-online worker
  188. func (s *Manager) registerWorker(c *gin.Context) {
  189. var _worker WorkerStatus
  190. c.BindJSON(&_worker)
  191. _worker.LastOnline = time.Now()
  192. newWorker, err := s.adapter.CreateWorker(_worker)
  193. if err != nil {
  194. err := fmt.Errorf("failed to register worker: %s",
  195. err.Error(),
  196. )
  197. c.Error(err)
  198. s.returnErrJSON(c, http.StatusInternalServerError, err)
  199. return
  200. }
  201. logger.Noticef("Worker <%s> registered", _worker.ID)
  202. // create workerCmd channel for this worker
  203. c.JSON(http.StatusOK, newWorker)
  204. }
  205. // listJobsOfWorker respond with all the jobs of the specified worker
  206. func (s *Manager) listJobsOfWorker(c *gin.Context) {
  207. workerID := c.Param("id")
  208. s.rwmu.RLock()
  209. mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
  210. s.rwmu.RUnlock()
  211. if err != nil {
  212. err := fmt.Errorf("failed to list jobs of worker %s: %s",
  213. workerID, err.Error(),
  214. )
  215. c.Error(err)
  216. s.returnErrJSON(c, http.StatusInternalServerError, err)
  217. return
  218. }
  219. c.JSON(http.StatusOK, mirrorStatusList)
  220. }
  221. func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) {
  222. c.JSON(code, gin.H{
  223. _errorKey: err.Error(),
  224. })
  225. }
  226. func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
  227. workerID := c.Param("id")
  228. var schedules MirrorSchedules
  229. c.BindJSON(&schedules)
  230. for _, schedule := range schedules.Schedules {
  231. mirrorName := schedule.MirrorName
  232. if len(mirrorName) == 0 {
  233. s.returnErrJSON(
  234. c, http.StatusBadRequest,
  235. errors.New("Mirror Name should not be empty"),
  236. )
  237. }
  238. s.rwmu.RLock()
  239. curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
  240. s.rwmu.RUnlock()
  241. if err != nil {
  242. fmt.Errorf("failed to get job %s of worker %s: %s",
  243. mirrorName, workerID, err.Error(),
  244. )
  245. continue
  246. }
  247. if curStatus.Scheduled == schedule.NextSchedule {
  248. // no changes, skip update
  249. continue
  250. }
  251. curStatus.Scheduled = schedule.NextSchedule
  252. s.rwmu.Lock()
  253. _, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
  254. s.rwmu.Unlock()
  255. if err != nil {
  256. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  257. mirrorName, workerID, err.Error(),
  258. )
  259. c.Error(err)
  260. s.returnErrJSON(c, http.StatusInternalServerError, err)
  261. return
  262. }
  263. }
  264. type empty struct{}
  265. c.JSON(http.StatusOK, empty{})
  266. }
  267. func (s *Manager) updateJobOfWorker(c *gin.Context) {
  268. workerID := c.Param("id")
  269. var status MirrorStatus
  270. c.BindJSON(&status)
  271. mirrorName := status.Name
  272. if len(mirrorName) == 0 {
  273. s.returnErrJSON(
  274. c, http.StatusBadRequest,
  275. errors.New("Mirror Name should not be empty"),
  276. )
  277. }
  278. s.rwmu.RLock()
  279. curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
  280. s.rwmu.RUnlock()
  281. curTime := time.Now()
  282. if status.Status == PreSyncing && curStatus.Status != PreSyncing {
  283. status.LastStarted = curTime
  284. } else {
  285. status.LastStarted = curStatus.LastStarted
  286. }
  287. // Only successful syncing needs last_update
  288. if status.Status == Success {
  289. status.LastUpdate = curTime
  290. } else {
  291. status.LastUpdate = curStatus.LastUpdate
  292. }
  293. if status.Status == Success || status.Status == Failed {
  294. status.LastEnded = curTime
  295. } else {
  296. status.LastEnded = curStatus.LastEnded
  297. }
  298. // Only message with meaningful size updates the mirror size
  299. if len(curStatus.Size) > 0 && curStatus.Size != "unknown" {
  300. if len(status.Size) == 0 || status.Size == "unknown" {
  301. status.Size = curStatus.Size
  302. }
  303. }
  304. // for logging
  305. switch status.Status {
  306. case Syncing:
  307. logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
  308. default:
  309. logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
  310. }
  311. s.rwmu.Lock()
  312. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  313. s.rwmu.Unlock()
  314. if err != nil {
  315. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  316. mirrorName, workerID, err.Error(),
  317. )
  318. c.Error(err)
  319. s.returnErrJSON(c, http.StatusInternalServerError, err)
  320. return
  321. }
  322. c.JSON(http.StatusOK, newStatus)
  323. }
  324. func (s *Manager) updateMirrorSize(c *gin.Context) {
  325. workerID := c.Param("id")
  326. type SizeMsg struct {
  327. Name string `json:"name"`
  328. Size string `json:"size"`
  329. }
  330. var msg SizeMsg
  331. c.BindJSON(&msg)
  332. mirrorName := msg.Name
  333. s.rwmu.RLock()
  334. status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
  335. s.rwmu.RUnlock()
  336. if err != nil {
  337. logger.Errorf(
  338. "Failed to get status of mirror %s @<%s>: %s",
  339. mirrorName, workerID, err.Error(),
  340. )
  341. s.returnErrJSON(c, http.StatusInternalServerError, err)
  342. return
  343. }
  344. // Only message with meaningful size updates the mirror size
  345. if len(msg.Size) > 0 || msg.Size != "unknown" {
  346. status.Size = msg.Size
  347. }
  348. logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
  349. s.rwmu.Lock()
  350. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  351. s.rwmu.Unlock()
  352. if err != nil {
  353. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  354. mirrorName, workerID, err.Error(),
  355. )
  356. c.Error(err)
  357. s.returnErrJSON(c, http.StatusInternalServerError, err)
  358. return
  359. }
  360. c.JSON(http.StatusOK, newStatus)
  361. }
  362. func (s *Manager) handleClientCmd(c *gin.Context) {
  363. var clientCmd ClientCmd
  364. c.BindJSON(&clientCmd)
  365. workerID := clientCmd.WorkerID
  366. if workerID == "" {
  367. // TODO: decide which worker should do this mirror when WorkerID is null string
  368. logger.Errorf("handleClientCmd case workerID == \" \" not implemented yet")
  369. c.AbortWithStatus(http.StatusInternalServerError)
  370. return
  371. }
  372. s.rwmu.RLock()
  373. w, err := s.adapter.GetWorker(workerID)
  374. s.rwmu.RUnlock()
  375. if err != nil {
  376. err := fmt.Errorf("worker %s is not registered yet", workerID)
  377. s.returnErrJSON(c, http.StatusBadRequest, err)
  378. return
  379. }
  380. workerURL := w.URL
  381. // parse client cmd into worker cmd
  382. workerCmd := WorkerCmd{
  383. Cmd: clientCmd.Cmd,
  384. MirrorID: clientCmd.MirrorID,
  385. Args: clientCmd.Args,
  386. Options: clientCmd.Options,
  387. }
  388. // update job status, even if the job did not disable successfully,
  389. // this status should be set as disabled
  390. s.rwmu.RLock()
  391. curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
  392. s.rwmu.RUnlock()
  393. changed := false
  394. switch clientCmd.Cmd {
  395. case CmdDisable:
  396. curStat.Status = Disabled
  397. changed = true
  398. case CmdStop:
  399. curStat.Status = Paused
  400. changed = true
  401. }
  402. if changed {
  403. s.rwmu.Lock()
  404. s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
  405. s.rwmu.Unlock()
  406. }
  407. logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)
  408. // post command to worker
  409. _, err = PostJSON(workerURL, workerCmd, s.httpClient)
  410. if err != nil {
  411. err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error())
  412. c.Error(err)
  413. s.returnErrJSON(c, http.StatusInternalServerError, err)
  414. return
  415. }
  416. // TODO: check response for success
  417. c.JSON(http.StatusOK, gin.H{_infoKey: "successfully send command to worker " + workerID})
  418. }