worker.go 10 KB


  1. package worker
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "html/template"
  7. "net/http"
  8. "path/filepath"
  9. "time"
  10. "github.com/gin-gonic/gin"
  11. . "github.com/tuna/tunasync/internal"
  12. )
  13. var tunasyncWorker *Worker
  14. // A Worker is a instance of tunasync worker
  15. type Worker struct {
  16. cfg *Config
  17. providers map[string]mirrorProvider
  18. jobs map[string]*mirrorJob
  19. managerChan chan jobMessage
  20. semaphore chan empty
  21. schedule *scheduleQueue
  22. httpEngine *gin.Engine
  23. httpClient *http.Client
  24. }
  25. // GetTUNASyncWorker returns a singalton worker
  26. func GetTUNASyncWorker(cfg *Config) *Worker {
  27. if tunasyncWorker != nil {
  28. return tunasyncWorker
  29. }
  30. w := &Worker{
  31. cfg: cfg,
  32. providers: make(map[string]mirrorProvider),
  33. jobs: make(map[string]*mirrorJob),
  34. managerChan: make(chan jobMessage, 32),
  35. semaphore: make(chan empty, cfg.Global.Concurrent),
  36. schedule: newScheduleQueue(),
  37. }
  38. if cfg.Manager.CACert != "" {
  39. httpClient, err := CreateHTTPClient(cfg.Manager.CACert)
  40. if err != nil {
  41. logger.Errorf("Error initializing HTTP client: %s", err.Error())
  42. return nil
  43. }
  44. w.httpClient = httpClient
  45. }
  46. w.initJobs()
  47. w.makeHTTPServer()
  48. tunasyncWorker = w
  49. return w
  50. }
  51. func (w *Worker) initProviders() {
  52. c := w.cfg
  53. formatLogDir := func(logDir string, m mirrorConfig) string {
  54. tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir)
  55. if err != nil {
  56. panic(err)
  57. }
  58. var formatedLogDir bytes.Buffer
  59. tmpl.Execute(&formatedLogDir, m)
  60. return formatedLogDir.String()
  61. }
  62. for _, mirror := range c.Mirrors {
  63. logDir := mirror.LogDir
  64. mirrorDir := mirror.MirrorDir
  65. if logDir == "" {
  66. logDir = c.Global.LogDir
  67. }
  68. if mirrorDir == "" {
  69. mirrorDir = filepath.Join(
  70. c.Global.MirrorDir, mirror.Name,
  71. )
  72. }
  73. if mirror.Interval == 0 {
  74. mirror.Interval = c.Global.Interval
  75. }
  76. logDir = formatLogDir(logDir, mirror)
  77. // IsMaster
  78. isMaster := true
  79. if mirror.Role == "slave" {
  80. isMaster = false
  81. } else {
  82. if mirror.Role != "" && mirror.Role != "master" {
  83. logger.Warningf("Invalid role configuration for %s", mirror.Name)
  84. }
  85. }
  86. var provider mirrorProvider
  87. switch mirror.Provider {
  88. case ProvCommand:
  89. pc := cmdConfig{
  90. name: mirror.Name,
  91. upstreamURL: mirror.Upstream,
  92. command: mirror.Command,
  93. workingDir: mirrorDir,
  94. logDir: logDir,
  95. logFile: filepath.Join(logDir, "latest.log"),
  96. interval: time.Duration(mirror.Interval) * time.Minute,
  97. env: mirror.Env,
  98. }
  99. p, err := newCmdProvider(pc)
  100. p.isMaster = isMaster
  101. if err != nil {
  102. panic(err)
  103. }
  104. provider = p
  105. case ProvRsync:
  106. rc := rsyncConfig{
  107. name: mirror.Name,
  108. upstreamURL: mirror.Upstream,
  109. rsyncCmd: mirror.Command,
  110. password: mirror.Password,
  111. excludeFile: mirror.ExcludeFile,
  112. workingDir: mirrorDir,
  113. logDir: logDir,
  114. logFile: filepath.Join(logDir, "latest.log"),
  115. useIPv6: mirror.UseIPv6,
  116. interval: time.Duration(mirror.Interval) * time.Minute,
  117. }
  118. p, err := newRsyncProvider(rc)
  119. p.isMaster = isMaster
  120. if err != nil {
  121. panic(err)
  122. }
  123. provider = p
  124. case ProvTwoStageRsync:
  125. rc := twoStageRsyncConfig{
  126. name: mirror.Name,
  127. stage1Profile: mirror.Stage1Profile,
  128. upstreamURL: mirror.Upstream,
  129. rsyncCmd: mirror.Command,
  130. password: mirror.Password,
  131. excludeFile: mirror.ExcludeFile,
  132. workingDir: mirrorDir,
  133. logDir: logDir,
  134. logFile: filepath.Join(logDir, "latest.log"),
  135. useIPv6: mirror.UseIPv6,
  136. interval: time.Duration(mirror.Interval) * time.Minute,
  137. }
  138. p, err := newTwoStageRsyncProvider(rc)
  139. p.isMaster = isMaster
  140. if err != nil {
  141. panic(err)
  142. }
  143. provider = p
  144. default:
  145. panic(errors.New("Invalid mirror provider"))
  146. }
  147. provider.AddHook(newLogLimiter(provider))
  148. // Add Cgroup Hook
  149. if w.cfg.Cgroup.Enable {
  150. provider.AddHook(
  151. newCgroupHook(provider, w.cfg.Cgroup.BasePath, w.cfg.Cgroup.Group),
  152. )
  153. }
  154. // ExecOnSuccess hook
  155. if mirror.ExecOnSuccess != "" {
  156. h, err := newExecPostHook(provider, execOnSuccess, mirror.ExecOnSuccess)
  157. if err != nil {
  158. logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
  159. panic(err)
  160. }
  161. provider.AddHook(h)
  162. }
  163. // ExecOnFailure hook
  164. if mirror.ExecOnFailure != "" {
  165. h, err := newExecPostHook(provider, execOnFailure, mirror.ExecOnFailure)
  166. if err != nil {
  167. logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
  168. panic(err)
  169. }
  170. provider.AddHook(h)
  171. }
  172. w.providers[provider.Name()] = provider
  173. }
  174. }
  175. func (w *Worker) initJobs() {
  176. w.initProviders()
  177. for name, provider := range w.providers {
  178. w.jobs[name] = newMirrorJob(provider)
  179. }
  180. }
  181. // Ctrl server receives commands from the manager
  182. func (w *Worker) makeHTTPServer() {
  183. s := gin.New()
  184. s.Use(gin.Recovery())
  185. s.POST("/", func(c *gin.Context) {
  186. var cmd WorkerCmd
  187. if err := c.BindJSON(&cmd); err != nil {
  188. c.JSON(http.StatusBadRequest, gin.H{"msg": "Invalid request"})
  189. return
  190. }
  191. job, ok := w.jobs[cmd.MirrorID]
  192. if !ok {
  193. c.JSON(http.StatusNotFound, gin.H{"msg": fmt.Sprintf("Mirror ``%s'' not found", cmd.MirrorID)})
  194. return
  195. }
  196. logger.Noticef("Received command: %v", cmd)
  197. // if job disabled, start them first
  198. switch cmd.Cmd {
  199. case CmdStart, CmdRestart:
  200. if job.State() == stateDisabled {
  201. go job.Run(w.managerChan, w.semaphore)
  202. }
  203. }
  204. switch cmd.Cmd {
  205. case CmdStart:
  206. job.ctrlChan <- jobStart
  207. case CmdRestart:
  208. job.ctrlChan <- jobRestart
  209. case CmdStop:
  210. // if job is disabled, no goroutine would be there
  211. // receiving this signal
  212. w.schedule.Remove(job.Name())
  213. if job.State() != stateDisabled {
  214. job.ctrlChan <- jobStop
  215. }
  216. case CmdDisable:
  217. w.schedule.Remove(job.Name())
  218. if job.State() != stateDisabled {
  219. job.ctrlChan <- jobDisable
  220. <-job.disabled
  221. }
  222. case CmdPing:
  223. job.ctrlChan <- jobStart
  224. default:
  225. c.JSON(http.StatusNotAcceptable, gin.H{"msg": "Invalid Command"})
  226. return
  227. }
  228. c.JSON(http.StatusOK, gin.H{"msg": "OK"})
  229. })
  230. w.httpEngine = s
  231. }
  232. func (w *Worker) runHTTPServer() {
  233. addr := fmt.Sprintf("%s:%d", w.cfg.Server.Addr, w.cfg.Server.Port)
  234. httpServer := &http.Server{
  235. Addr: addr,
  236. Handler: w.httpEngine,
  237. ReadTimeout: 10 * time.Second,
  238. WriteTimeout: 10 * time.Second,
  239. }
  240. if w.cfg.Server.SSLCert == "" && w.cfg.Server.SSLKey == "" {
  241. if err := httpServer.ListenAndServe(); err != nil {
  242. panic(err)
  243. }
  244. } else {
  245. if err := httpServer.ListenAndServeTLS(w.cfg.Server.SSLCert, w.cfg.Server.SSLKey); err != nil {
  246. panic(err)
  247. }
  248. }
  249. }
  250. // Run runs worker forever
  251. func (w *Worker) Run() {
  252. w.registorWorker()
  253. go w.runHTTPServer()
  254. w.runSchedule()
  255. }
  256. func (w *Worker) runSchedule() {
  257. mirrorList := w.fetchJobStatus()
  258. unset := make(map[string]bool)
  259. for name := range w.jobs {
  260. unset[name] = true
  261. }
  262. // Fetch mirror list stored in the manager
  263. // put it on the scheduled time
  264. // if it's disabled, ignore it
  265. for _, m := range mirrorList {
  266. if job, ok := w.jobs[m.Name]; ok {
  267. delete(unset, m.Name)
  268. switch m.Status {
  269. case Disabled:
  270. job.SetState(stateDisabled)
  271. continue
  272. case Paused:
  273. job.SetState(statePaused)
  274. go job.Run(w.managerChan, w.semaphore)
  275. continue
  276. default:
  277. job.SetState(stateReady)
  278. go job.Run(w.managerChan, w.semaphore)
  279. stime := m.LastUpdate.Add(job.provider.Interval())
  280. logger.Debugf("Scheduling job %s @%s", job.Name(), stime.Format("2006-01-02 15:04:05"))
  281. w.schedule.AddJob(stime, job)
  282. }
  283. }
  284. }
  285. // some new jobs may be added
  286. // which does not exist in the
  287. // manager's mirror list
  288. for name := range unset {
  289. job := w.jobs[name]
  290. job.SetState(stateReady)
  291. go job.Run(w.managerChan, w.semaphore)
  292. w.schedule.AddJob(time.Now(), job)
  293. }
  294. for {
  295. select {
  296. case jobMsg := <-w.managerChan:
  297. // got status update from job
  298. job := w.jobs[jobMsg.name]
  299. if job.State() != stateReady {
  300. logger.Infof("Job %s state is not ready, skip adding new schedule", jobMsg.name)
  301. continue
  302. }
  303. // syncing status is only meaningful when job
  304. // is running. If it's paused or disabled
  305. // a sync failure signal would be emitted
  306. // which needs to be ignored
  307. w.updateStatus(jobMsg)
  308. // only successful or the final failure msg
  309. // can trigger scheduling
  310. if jobMsg.schedule {
  311. schedTime := time.Now().Add(job.provider.Interval())
  312. logger.Noticef(
  313. "Next scheduled time for %s: %s",
  314. job.Name(),
  315. schedTime.Format("2006-01-02 15:04:05"),
  316. )
  317. w.schedule.AddJob(schedTime, job)
  318. }
  319. case <-time.Tick(5 * time.Second):
  320. // check schedule every 5 seconds
  321. if job := w.schedule.Pop(); job != nil {
  322. job.ctrlChan <- jobStart
  323. }
  324. }
  325. }
  326. }
  327. // Name returns worker name
  328. func (w *Worker) Name() string {
  329. return w.cfg.Global.Name
  330. }
  331. // URL returns the url to http server of the worker
  332. func (w *Worker) URL() string {
  333. proto := "https"
  334. if w.cfg.Server.SSLCert == "" && w.cfg.Server.SSLKey == "" {
  335. proto = "http"
  336. }
  337. return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port)
  338. }
  339. func (w *Worker) registorWorker() {
  340. url := fmt.Sprintf(
  341. "%s/workers",
  342. w.cfg.Manager.APIBase,
  343. )
  344. msg := WorkerStatus{
  345. ID: w.Name(),
  346. URL: w.URL(),
  347. }
  348. if _, err := PostJSON(url, msg, w.httpClient); err != nil {
  349. logger.Errorf("Failed to register worker")
  350. }
  351. }
  352. func (w *Worker) updateStatus(jobMsg jobMessage) {
  353. url := fmt.Sprintf(
  354. "%s/workers/%s/jobs/%s",
  355. w.cfg.Manager.APIBase,
  356. w.Name(),
  357. jobMsg.name,
  358. )
  359. p := w.providers[jobMsg.name]
  360. smsg := MirrorStatus{
  361. Name: jobMsg.name,
  362. Worker: w.cfg.Global.Name,
  363. IsMaster: p.IsMaster(),
  364. Status: jobMsg.status,
  365. Upstream: p.Upstream(),
  366. Size: "unknown",
  367. ErrorMsg: jobMsg.msg,
  368. }
  369. if _, err := PostJSON(url, smsg, w.httpClient); err != nil {
  370. logger.Errorf("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error())
  371. }
  372. }
  373. func (w *Worker) fetchJobStatus() []MirrorStatus {
  374. var mirrorList []MirrorStatus
  375. url := fmt.Sprintf(
  376. "%s/workers/%s/jobs",
  377. w.cfg.Manager.APIBase,
  378. w.Name(),
  379. )
  380. if _, err := GetJSON(url, &mirrorList, w.httpClient); err != nil {
  381. logger.Errorf("Failed to fetch job status: %s", err.Error())
  382. }
  383. return mirrorList
  384. }