worker.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. package worker
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "errors"
  6. "fmt"
  7. "html/template"
  8. "net/http"
  9. "path/filepath"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. . "github.com/tuna/tunasync/internal"
  13. )
  14. var tunasyncWorker *Worker
  15. // A Worker is a instance of tunasync worker
  16. type Worker struct {
  17. cfg *Config
  18. providers map[string]mirrorProvider
  19. jobs map[string]*mirrorJob
  20. managerChan chan jobMessage
  21. semaphore chan empty
  22. schedule *scheduleQueue
  23. httpServer *gin.Engine
  24. tlsConfig *tls.Config
  25. mirrorStatus map[string]SyncStatus
  26. }
  27. // GetTUNASyncWorker returns a singalton worker
  28. func GetTUNASyncWorker(cfg *Config) *Worker {
  29. if tunasyncWorker != nil {
  30. return tunasyncWorker
  31. }
  32. w := &Worker{
  33. cfg: cfg,
  34. providers: make(map[string]mirrorProvider),
  35. jobs: make(map[string]*mirrorJob),
  36. managerChan: make(chan jobMessage, 32),
  37. semaphore: make(chan empty, cfg.Global.Concurrent),
  38. schedule: newScheduleQueue(),
  39. mirrorStatus: make(map[string]SyncStatus),
  40. }
  41. if cfg.Manager.CACert != "" {
  42. tlsConfig, err := GetTLSConfig(cfg.Manager.CACert)
  43. if err != nil {
  44. logger.Error("Failed to init TLS config: %s", err.Error())
  45. return nil
  46. }
  47. w.tlsConfig = tlsConfig
  48. }
  49. w.initJobs()
  50. w.makeHTTPServer()
  51. tunasyncWorker = w
  52. return w
  53. }
  54. func (w *Worker) initProviders() {
  55. c := w.cfg
  56. formatLogDir := func(logDir string, m mirrorConfig) string {
  57. tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir)
  58. if err != nil {
  59. panic(err)
  60. }
  61. var formatedLogDir bytes.Buffer
  62. tmpl.Execute(&formatedLogDir, m)
  63. return formatedLogDir.String()
  64. }
  65. for _, mirror := range c.Mirrors {
  66. logDir := mirror.LogDir
  67. mirrorDir := mirror.MirrorDir
  68. if logDir == "" {
  69. logDir = c.Global.LogDir
  70. }
  71. if mirrorDir == "" {
  72. mirrorDir = filepath.Join(
  73. c.Global.MirrorDir, mirror.Name,
  74. )
  75. }
  76. logDir = formatLogDir(logDir, mirror)
  77. var provider mirrorProvider
  78. switch mirror.Provider {
  79. case ProvCommand:
  80. pc := cmdConfig{
  81. name: mirror.Name,
  82. upstreamURL: mirror.Upstream,
  83. command: mirror.Command,
  84. workingDir: mirrorDir,
  85. logDir: logDir,
  86. logFile: filepath.Join(logDir, "latest.log"),
  87. interval: time.Duration(mirror.Interval) * time.Minute,
  88. env: mirror.Env,
  89. }
  90. p, err := newCmdProvider(pc)
  91. if err != nil {
  92. panic(err)
  93. }
  94. provider = p
  95. case ProvRsync:
  96. rc := rsyncConfig{
  97. name: mirror.Name,
  98. upstreamURL: mirror.Upstream,
  99. rsyncCmd: mirror.Command,
  100. password: mirror.Password,
  101. excludeFile: mirror.ExcludeFile,
  102. workingDir: mirrorDir,
  103. logDir: logDir,
  104. logFile: filepath.Join(logDir, "latest.log"),
  105. useIPv6: mirror.UseIPv6,
  106. interval: time.Duration(mirror.Interval) * time.Minute,
  107. }
  108. p, err := newRsyncProvider(rc)
  109. if err != nil {
  110. panic(err)
  111. }
  112. provider = p
  113. case ProvTwoStageRsync:
  114. rc := twoStageRsyncConfig{
  115. name: mirror.Name,
  116. stage1Profile: mirror.Stage1Profile,
  117. upstreamURL: mirror.Upstream,
  118. rsyncCmd: mirror.Command,
  119. password: mirror.Password,
  120. excludeFile: mirror.ExcludeFile,
  121. workingDir: mirrorDir,
  122. logDir: logDir,
  123. logFile: filepath.Join(logDir, "latest.log"),
  124. useIPv6: mirror.UseIPv6,
  125. interval: time.Duration(mirror.Interval) * time.Minute,
  126. }
  127. p, err := newTwoStageRsyncProvider(rc)
  128. if err != nil {
  129. panic(err)
  130. }
  131. provider = p
  132. default:
  133. panic(errors.New("Invalid mirror provider"))
  134. }
  135. provider.AddHook(newLogLimiter(provider))
  136. w.providers[provider.Name()] = provider
  137. }
  138. }
  139. func (w *Worker) initJobs() {
  140. w.initProviders()
  141. for name, provider := range w.providers {
  142. w.jobs[name] = newMirrorJob(provider)
  143. go w.jobs[name].Run(w.managerChan, w.semaphore)
  144. w.mirrorStatus[name] = Paused
  145. }
  146. }
  147. // Ctrl server receives commands from the manager
  148. func (w *Worker) makeHTTPServer() {
  149. s := gin.New()
  150. s.Use(gin.Recovery())
  151. s.POST("/", func(c *gin.Context) {
  152. var cmd WorkerCmd
  153. if err := c.BindJSON(&cmd); err != nil {
  154. c.JSON(http.StatusBadRequest, gin.H{"msg": "Invalid request"})
  155. return
  156. }
  157. job, ok := w.jobs[cmd.MirrorID]
  158. if !ok {
  159. c.JSON(http.StatusNotFound, gin.H{"msg": fmt.Sprintf("Mirror ``%s'' not found", cmd.MirrorID)})
  160. return
  161. }
  162. // if job disabled, start them first
  163. switch cmd.Cmd {
  164. case CmdStart, CmdRestart:
  165. if job.Disabled() {
  166. go job.Run(w.managerChan, w.semaphore)
  167. }
  168. }
  169. switch cmd.Cmd {
  170. case CmdStart:
  171. job.ctrlChan <- jobStart
  172. case CmdStop:
  173. job.ctrlChan <- jobStop
  174. case CmdRestart:
  175. job.ctrlChan <- jobRestart
  176. case CmdDisable:
  177. w.schedule.Remove(job.Name())
  178. job.ctrlChan <- jobDisable
  179. <-job.disabled
  180. case CmdPing:
  181. job.ctrlChan <- jobStart
  182. default:
  183. c.JSON(http.StatusNotAcceptable, gin.H{"msg": "Invalid Command"})
  184. return
  185. }
  186. c.JSON(http.StatusOK, gin.H{"msg": "OK"})
  187. })
  188. w.httpServer = s
  189. }
  190. func (w *Worker) runHTTPServer() {
  191. addr := fmt.Sprintf("%s:%d", w.cfg.Server.Addr, w.cfg.Server.Port)
  192. if w.cfg.Server.SSLCert == "" && w.cfg.Server.SSLKey == "" {
  193. if err := w.httpServer.Run(addr); err != nil {
  194. panic(err)
  195. }
  196. } else {
  197. if err := w.httpServer.RunTLS(addr, w.cfg.Server.SSLCert, w.cfg.Server.SSLKey); err != nil {
  198. panic(err)
  199. }
  200. }
  201. }
  202. // Run runs worker forever
  203. func (w *Worker) Run() {
  204. w.registorWorker()
  205. go w.runHTTPServer()
  206. w.runSchedule()
  207. }
  208. func (w *Worker) runSchedule() {
  209. mirrorList := w.fetchJobStatus()
  210. unset := make(map[string]bool)
  211. for name := range w.jobs {
  212. unset[name] = true
  213. }
  214. for _, m := range mirrorList {
  215. if job, ok := w.jobs[m.Name]; ok {
  216. stime := m.LastUpdate.Add(job.provider.Interval())
  217. w.schedule.AddJob(stime, job)
  218. delete(unset, m.Name)
  219. }
  220. }
  221. for name := range unset {
  222. job := w.jobs[name]
  223. w.schedule.AddJob(time.Now(), job)
  224. }
  225. for {
  226. select {
  227. case jobMsg := <-w.managerChan:
  228. // got status update from job
  229. w.updateStatus(jobMsg)
  230. status := w.mirrorStatus[jobMsg.name]
  231. if status == Disabled || status == Paused {
  232. continue
  233. }
  234. w.mirrorStatus[jobMsg.name] = jobMsg.status
  235. switch jobMsg.status {
  236. case Success, Failed:
  237. job := w.jobs[jobMsg.name]
  238. w.schedule.AddJob(
  239. time.Now().Add(job.provider.Interval()),
  240. job,
  241. )
  242. }
  243. case <-time.Tick(10 * time.Second):
  244. if job := w.schedule.Pop(); job != nil {
  245. job.ctrlChan <- jobStart
  246. }
  247. }
  248. }
  249. }
  250. // Name returns worker name
  251. func (w *Worker) Name() string {
  252. return w.cfg.Global.Name
  253. }
  254. // URL returns the url to http server of the worker
  255. func (w *Worker) URL() string {
  256. proto := "https"
  257. if w.cfg.Server.SSLCert == "" && w.cfg.Server.SSLKey == "" {
  258. proto = "http"
  259. }
  260. return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port)
  261. }
  262. func (w *Worker) registorWorker() {
  263. url := fmt.Sprintf(
  264. "%s/workers",
  265. w.cfg.Manager.APIBase,
  266. )
  267. msg := WorkerStatus{
  268. ID: w.Name(),
  269. URL: w.URL(),
  270. }
  271. if _, err := PostJSON(url, msg, w.tlsConfig); err != nil {
  272. logger.Error("Failed to register worker")
  273. }
  274. }
  275. func (w *Worker) updateStatus(jobMsg jobMessage) {
  276. url := fmt.Sprintf(
  277. "%s/workers/%s/jobs/%s",
  278. w.cfg.Manager.APIBase,
  279. w.Name(),
  280. jobMsg.name,
  281. )
  282. p := w.providers[jobMsg.name]
  283. smsg := MirrorStatus{
  284. Name: jobMsg.name,
  285. Worker: w.cfg.Global.Name,
  286. IsMaster: true,
  287. Status: jobMsg.status,
  288. LastUpdate: time.Now(),
  289. Upstream: p.Upstream(),
  290. Size: "unknown",
  291. ErrorMsg: jobMsg.msg,
  292. }
  293. if _, err := PostJSON(url, smsg, w.tlsConfig); err != nil {
  294. logger.Error("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error())
  295. }
  296. }
  297. func (w *Worker) fetchJobStatus() []MirrorStatus {
  298. var mirrorList []MirrorStatus
  299. url := fmt.Sprintf(
  300. "%s/workers/%s/jobs",
  301. w.cfg.Manager.APIBase,
  302. w.Name(),
  303. )
  304. if _, err := GetJSON(url, &mirrorList, w.tlsConfig); err != nil {
  305. logger.Error("Failed to fetch job status: %s", err.Error())
  306. }
  307. return mirrorList
  308. }