server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. package manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/gin-gonic/gin"
  9. . "github.com/tuna/tunasync/internal"
  10. )
  11. const (
  12. _errorKey = "error"
  13. _infoKey = "message"
  14. )
  15. var manager *Manager
  16. // A Manager represents a manager server
  17. type Manager struct {
  18. cfg *Config
  19. engine *gin.Engine
  20. adapter dbAdapter
  21. rwmu sync.RWMutex
  22. httpClient *http.Client
  23. }
  24. // GetTUNASyncManager returns the manager from config
  25. func GetTUNASyncManager(cfg *Config) *Manager {
  26. if manager != nil {
  27. return manager
  28. }
  29. // create gin engine
  30. if !cfg.Debug {
  31. gin.SetMode(gin.ReleaseMode)
  32. }
  33. s := &Manager{
  34. cfg: cfg,
  35. adapter: nil,
  36. }
  37. s.engine = gin.New()
  38. s.engine.Use(gin.Recovery())
  39. if cfg.Debug {
  40. s.engine.Use(gin.Logger())
  41. }
  42. if cfg.Files.CACert != "" {
  43. httpClient, err := CreateHTTPClient(cfg.Files.CACert)
  44. if err != nil {
  45. logger.Errorf("Error initializing HTTP client: %s", err.Error())
  46. return nil
  47. }
  48. s.httpClient = httpClient
  49. }
  50. if cfg.Files.DBFile != "" {
  51. adapter, err := makeDBAdapter(cfg.Files.DBType, cfg.Files.DBFile)
  52. if err != nil {
  53. logger.Errorf("Error initializing DB adapter: %s", err.Error())
  54. return nil
  55. }
  56. s.setDBAdapter(adapter)
  57. }
  58. // common log middleware
  59. s.engine.Use(contextErrorLogger)
  60. s.engine.GET("/ping", func(c *gin.Context) {
  61. c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
  62. })
  63. // list jobs, status page
  64. s.engine.GET("/jobs", s.listAllJobs)
  65. // flush disabled jobs
  66. s.engine.DELETE("/jobs/disabled", s.flushDisabledJobs)
  67. // list workers
  68. s.engine.GET("/workers", s.listWorkers)
  69. // worker online
  70. s.engine.POST("/workers", s.registerWorker)
  71. // workerID should be valid in this route group
  72. workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
  73. {
  74. // delete specified worker
  75. workerValidateGroup.DELETE(":id", s.deleteWorker)
  76. // get job list
  77. workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
  78. // post job status
  79. workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
  80. workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize)
  81. workerValidateGroup.POST(":id/schedules", s.updateSchedulesOfWorker)
  82. }
  83. // for tunasynctl to post commands
  84. s.engine.POST("/cmd", s.handleClientCmd)
  85. manager = s
  86. return s
  87. }
  88. func (s *Manager) setDBAdapter(adapter dbAdapter) {
  89. s.adapter = adapter
  90. }
  91. // Run runs the manager server forever
  92. func (s *Manager) Run() {
  93. addr := fmt.Sprintf("%s:%d", s.cfg.Server.Addr, s.cfg.Server.Port)
  94. httpServer := &http.Server{
  95. Addr: addr,
  96. Handler: s.engine,
  97. ReadTimeout: 10 * time.Second,
  98. WriteTimeout: 10 * time.Second,
  99. }
  100. if s.cfg.Server.SSLCert == "" && s.cfg.Server.SSLKey == "" {
  101. if err := httpServer.ListenAndServe(); err != nil {
  102. panic(err)
  103. }
  104. } else {
  105. if err := httpServer.ListenAndServeTLS(s.cfg.Server.SSLCert, s.cfg.Server.SSLKey); err != nil {
  106. panic(err)
  107. }
  108. }
  109. }
  110. // listAllJobs respond with all jobs of specified workers
  111. func (s *Manager) listAllJobs(c *gin.Context) {
  112. s.rwmu.RLock()
  113. mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
  114. s.rwmu.RUnlock()
  115. if err != nil {
  116. err := fmt.Errorf("failed to list all mirror status: %s",
  117. err.Error(),
  118. )
  119. c.Error(err)
  120. s.returnErrJSON(c, http.StatusInternalServerError, err)
  121. return
  122. }
  123. webMirStatusList := []WebMirrorStatus{}
  124. for _, m := range mirrorStatusList {
  125. webMirStatusList = append(
  126. webMirStatusList,
  127. BuildWebMirrorStatus(m),
  128. )
  129. }
  130. c.JSON(http.StatusOK, webMirStatusList)
  131. }
  132. // flushDisabledJobs deletes all jobs that marks as deleted
  133. func (s *Manager) flushDisabledJobs(c *gin.Context) {
  134. s.rwmu.Lock()
  135. err := s.adapter.FlushDisabledJobs()
  136. s.rwmu.Unlock()
  137. if err != nil {
  138. err := fmt.Errorf("failed to flush disabled jobs: %s",
  139. err.Error(),
  140. )
  141. c.Error(err)
  142. s.returnErrJSON(c, http.StatusInternalServerError, err)
  143. return
  144. }
  145. c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
  146. }
  147. // deleteWorker deletes one worker by id
  148. func (s *Manager) deleteWorker(c *gin.Context) {
  149. workerID := c.Param("id")
  150. s.rwmu.Lock()
  151. err := s.adapter.DeleteWorker(workerID)
  152. s.rwmu.Unlock()
  153. if err != nil {
  154. err := fmt.Errorf("failed to delete worker: %s",
  155. err.Error(),
  156. )
  157. c.Error(err)
  158. s.returnErrJSON(c, http.StatusInternalServerError, err)
  159. return
  160. }
  161. logger.Noticef("Worker <%s> deleted", workerID)
  162. c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
  163. }
  164. // listWorkers respond with information of all the workers
  165. func (s *Manager) listWorkers(c *gin.Context) {
  166. var workerInfos []WorkerStatus
  167. s.rwmu.RLock()
  168. workers, err := s.adapter.ListWorkers()
  169. s.rwmu.RUnlock()
  170. if err != nil {
  171. err := fmt.Errorf("failed to list workers: %s",
  172. err.Error(),
  173. )
  174. c.Error(err)
  175. s.returnErrJSON(c, http.StatusInternalServerError, err)
  176. return
  177. }
  178. for _, w := range workers {
  179. workerInfos = append(workerInfos,
  180. WorkerStatus{
  181. ID: w.ID,
  182. URL: w.URL,
  183. Token: "REDACTED",
  184. LastOnline: w.LastOnline,
  185. LastRegister: w.LastRegister,
  186. })
  187. }
  188. c.JSON(http.StatusOK, workerInfos)
  189. }
  190. // registerWorker register an newly-online worker
  191. func (s *Manager) registerWorker(c *gin.Context) {
  192. var _worker WorkerStatus
  193. c.BindJSON(&_worker)
  194. _worker.LastOnline = time.Now()
  195. _worker.LastRegister = time.Now()
  196. newWorker, err := s.adapter.CreateWorker(_worker)
  197. if err != nil {
  198. err := fmt.Errorf("failed to register worker: %s",
  199. err.Error(),
  200. )
  201. c.Error(err)
  202. s.returnErrJSON(c, http.StatusInternalServerError, err)
  203. return
  204. }
  205. logger.Noticef("Worker <%s> registered", _worker.ID)
  206. // create workerCmd channel for this worker
  207. c.JSON(http.StatusOK, newWorker)
  208. }
  209. // listJobsOfWorker respond with all the jobs of the specified worker
  210. func (s *Manager) listJobsOfWorker(c *gin.Context) {
  211. workerID := c.Param("id")
  212. s.rwmu.RLock()
  213. mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
  214. s.rwmu.RUnlock()
  215. if err != nil {
  216. err := fmt.Errorf("failed to list jobs of worker %s: %s",
  217. workerID, err.Error(),
  218. )
  219. c.Error(err)
  220. s.returnErrJSON(c, http.StatusInternalServerError, err)
  221. return
  222. }
  223. c.JSON(http.StatusOK, mirrorStatusList)
  224. }
  225. func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) {
  226. c.JSON(code, gin.H{
  227. _errorKey: err.Error(),
  228. })
  229. }
  230. func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
  231. workerID := c.Param("id")
  232. var schedules MirrorSchedules
  233. c.BindJSON(&schedules)
  234. for _, schedule := range schedules.Schedules {
  235. mirrorName := schedule.MirrorName
  236. if len(mirrorName) == 0 {
  237. s.returnErrJSON(
  238. c, http.StatusBadRequest,
  239. errors.New("mirror Name should not be empty"),
  240. )
  241. }
  242. s.rwmu.RLock()
  243. s.adapter.RefreshWorker(workerID)
  244. curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
  245. s.rwmu.RUnlock()
  246. if err != nil {
  247. logger.Errorf("failed to get job %s of worker %s: %s",
  248. mirrorName, workerID, err.Error(),
  249. )
  250. continue
  251. }
  252. if curStatus.Scheduled == schedule.NextSchedule {
  253. // no changes, skip update
  254. continue
  255. }
  256. curStatus.Scheduled = schedule.NextSchedule
  257. s.rwmu.Lock()
  258. _, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
  259. s.rwmu.Unlock()
  260. if err != nil {
  261. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  262. mirrorName, workerID, err.Error(),
  263. )
  264. c.Error(err)
  265. s.returnErrJSON(c, http.StatusInternalServerError, err)
  266. return
  267. }
  268. }
  269. type empty struct{}
  270. c.JSON(http.StatusOK, empty{})
  271. }
  272. func (s *Manager) updateJobOfWorker(c *gin.Context) {
  273. workerID := c.Param("id")
  274. var status MirrorStatus
  275. c.BindJSON(&status)
  276. mirrorName := status.Name
  277. if len(mirrorName) == 0 {
  278. s.returnErrJSON(
  279. c, http.StatusBadRequest,
  280. errors.New("mirror Name should not be empty"),
  281. )
  282. }
  283. s.rwmu.RLock()
  284. s.adapter.RefreshWorker(workerID)
  285. curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
  286. s.rwmu.RUnlock()
  287. curTime := time.Now()
  288. if status.Status == PreSyncing && curStatus.Status != PreSyncing {
  289. status.LastStarted = curTime
  290. } else {
  291. status.LastStarted = curStatus.LastStarted
  292. }
  293. // Only successful syncing needs last_update
  294. if status.Status == Success {
  295. status.LastUpdate = curTime
  296. } else {
  297. status.LastUpdate = curStatus.LastUpdate
  298. }
  299. if status.Status == Success || status.Status == Failed {
  300. status.LastEnded = curTime
  301. } else {
  302. status.LastEnded = curStatus.LastEnded
  303. }
  304. // Only message with meaningful size updates the mirror size
  305. if len(curStatus.Size) > 0 && curStatus.Size != "unknown" {
  306. if len(status.Size) == 0 || status.Size == "unknown" {
  307. status.Size = curStatus.Size
  308. }
  309. }
  310. // for logging
  311. switch status.Status {
  312. case Syncing:
  313. logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
  314. default:
  315. logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
  316. }
  317. s.rwmu.Lock()
  318. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  319. s.rwmu.Unlock()
  320. if err != nil {
  321. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  322. mirrorName, workerID, err.Error(),
  323. )
  324. c.Error(err)
  325. s.returnErrJSON(c, http.StatusInternalServerError, err)
  326. return
  327. }
  328. c.JSON(http.StatusOK, newStatus)
  329. }
  330. func (s *Manager) updateMirrorSize(c *gin.Context) {
  331. workerID := c.Param("id")
  332. type SizeMsg struct {
  333. Name string `json:"name"`
  334. Size string `json:"size"`
  335. }
  336. var msg SizeMsg
  337. c.BindJSON(&msg)
  338. mirrorName := msg.Name
  339. s.rwmu.RLock()
  340. s.adapter.RefreshWorker(workerID)
  341. status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
  342. s.rwmu.RUnlock()
  343. if err != nil {
  344. logger.Errorf(
  345. "Failed to get status of mirror %s @<%s>: %s",
  346. mirrorName, workerID, err.Error(),
  347. )
  348. s.returnErrJSON(c, http.StatusInternalServerError, err)
  349. return
  350. }
  351. // Only message with meaningful size updates the mirror size
  352. if len(msg.Size) > 0 || msg.Size != "unknown" {
  353. status.Size = msg.Size
  354. }
  355. logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
  356. s.rwmu.Lock()
  357. newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
  358. s.rwmu.Unlock()
  359. if err != nil {
  360. err := fmt.Errorf("failed to update job %s of worker %s: %s",
  361. mirrorName, workerID, err.Error(),
  362. )
  363. c.Error(err)
  364. s.returnErrJSON(c, http.StatusInternalServerError, err)
  365. return
  366. }
  367. c.JSON(http.StatusOK, newStatus)
  368. }
  369. func (s *Manager) handleClientCmd(c *gin.Context) {
  370. var clientCmd ClientCmd
  371. c.BindJSON(&clientCmd)
  372. workerID := clientCmd.WorkerID
  373. if workerID == "" {
  374. // TODO: decide which worker should do this mirror when WorkerID is null string
  375. logger.Errorf("handleClientCmd case workerID == \" \" not implemented yet")
  376. c.AbortWithStatus(http.StatusInternalServerError)
  377. return
  378. }
  379. s.rwmu.RLock()
  380. w, err := s.adapter.GetWorker(workerID)
  381. s.rwmu.RUnlock()
  382. if err != nil {
  383. err := fmt.Errorf("worker %s is not registered yet", workerID)
  384. s.returnErrJSON(c, http.StatusBadRequest, err)
  385. return
  386. }
  387. workerURL := w.URL
  388. // parse client cmd into worker cmd
  389. workerCmd := WorkerCmd{
  390. Cmd: clientCmd.Cmd,
  391. MirrorID: clientCmd.MirrorID,
  392. Args: clientCmd.Args,
  393. Options: clientCmd.Options,
  394. }
  395. // update job status, even if the job did not disable successfully,
  396. // this status should be set as disabled
  397. s.rwmu.RLock()
  398. curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
  399. s.rwmu.RUnlock()
  400. changed := false
  401. switch clientCmd.Cmd {
  402. case CmdDisable:
  403. curStat.Status = Disabled
  404. changed = true
  405. case CmdStop:
  406. curStat.Status = Paused
  407. changed = true
  408. }
  409. if changed {
  410. s.rwmu.Lock()
  411. s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
  412. s.rwmu.Unlock()
  413. }
  414. logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)
  415. // post command to worker
  416. _, err = PostJSON(workerURL, workerCmd, s.httpClient)
  417. if err != nil {
  418. err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error())
  419. c.Error(err)
  420. s.returnErrJSON(c, http.StatusInternalServerError, err)
  421. return
  422. }
  423. // TODO: check response for success
  424. c.JSON(http.StatusOK, gin.H{_infoKey: "successfully send command to worker " + workerID})
  425. }