provider.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package worker
  2. import (
  3. "bytes"
  4. "errors"
  5. "html/template"
  6. "path/filepath"
  7. "time"
  8. )
  9. // mirror provider is the wrapper of mirror jobs
  10. const (
  11. _WorkingDirKey = "working_dir"
  12. _LogDirKey = "log_dir"
  13. _LogFileKey = "log_file"
  14. )
  15. // A mirrorProvider instance
  16. type mirrorProvider interface {
  17. // name
  18. Name() string
  19. Upstream() string
  20. Type() providerEnum
  21. // run mirror job in background
  22. Run() error
  23. // run mirror job in background
  24. Start() error
  25. // Wait job to finish
  26. Wait() error
  27. // terminate mirror job
  28. Terminate() error
  29. // job hooks
  30. IsRunning() bool
  31. // Cgroup
  32. Cgroup() *cgroupHook
  33. AddHook(hook jobHook)
  34. Hooks() []jobHook
  35. Interval() time.Duration
  36. WorkingDir() string
  37. LogDir() string
  38. LogFile() string
  39. IsMaster() bool
  40. // enter context
  41. EnterContext() *Context
  42. // exit context
  43. ExitContext() *Context
  44. // return context
  45. Context() *Context
  46. }
  47. // newProvider creates a mirrorProvider instance
  48. // using a mirrorCfg and the global cfg
  49. func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
  50. formatLogDir := func(logDir string, m mirrorConfig) string {
  51. tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir)
  52. if err != nil {
  53. panic(err)
  54. }
  55. var formatedLogDir bytes.Buffer
  56. tmpl.Execute(&formatedLogDir, m)
  57. return formatedLogDir.String()
  58. }
  59. logDir := mirror.LogDir
  60. mirrorDir := mirror.MirrorDir
  61. if logDir == "" {
  62. logDir = cfg.Global.LogDir
  63. }
  64. if mirrorDir == "" {
  65. mirrorDir = filepath.Join(
  66. cfg.Global.MirrorDir, mirror.Name,
  67. )
  68. }
  69. if mirror.Interval == 0 {
  70. mirror.Interval = cfg.Global.Interval
  71. }
  72. logDir = formatLogDir(logDir, mirror)
  73. // IsMaster
  74. isMaster := true
  75. if mirror.Role == "slave" {
  76. isMaster = false
  77. } else {
  78. if mirror.Role != "" && mirror.Role != "master" {
  79. logger.Warningf("Invalid role configuration for %s", mirror.Name)
  80. }
  81. }
  82. var provider mirrorProvider
  83. switch mirror.Provider {
  84. case provCommand:
  85. pc := cmdConfig{
  86. name: mirror.Name,
  87. upstreamURL: mirror.Upstream,
  88. command: mirror.Command,
  89. workingDir: mirrorDir,
  90. logDir: logDir,
  91. logFile: filepath.Join(logDir, "latest.log"),
  92. interval: time.Duration(mirror.Interval) * time.Minute,
  93. env: mirror.Env,
  94. }
  95. p, err := newCmdProvider(pc)
  96. p.isMaster = isMaster
  97. if err != nil {
  98. panic(err)
  99. }
  100. provider = p
  101. case provRsync:
  102. rc := rsyncConfig{
  103. name: mirror.Name,
  104. upstreamURL: mirror.Upstream,
  105. rsyncCmd: mirror.Command,
  106. password: mirror.Password,
  107. excludeFile: mirror.ExcludeFile,
  108. workingDir: mirrorDir,
  109. logDir: logDir,
  110. logFile: filepath.Join(logDir, "latest.log"),
  111. useIPv6: mirror.UseIPv6,
  112. interval: time.Duration(mirror.Interval) * time.Minute,
  113. }
  114. p, err := newRsyncProvider(rc)
  115. p.isMaster = isMaster
  116. if err != nil {
  117. panic(err)
  118. }
  119. provider = p
  120. case provTwoStageRsync:
  121. rc := twoStageRsyncConfig{
  122. name: mirror.Name,
  123. stage1Profile: mirror.Stage1Profile,
  124. upstreamURL: mirror.Upstream,
  125. rsyncCmd: mirror.Command,
  126. password: mirror.Password,
  127. excludeFile: mirror.ExcludeFile,
  128. workingDir: mirrorDir,
  129. logDir: logDir,
  130. logFile: filepath.Join(logDir, "latest.log"),
  131. useIPv6: mirror.UseIPv6,
  132. interval: time.Duration(mirror.Interval) * time.Minute,
  133. }
  134. p, err := newTwoStageRsyncProvider(rc)
  135. p.isMaster = isMaster
  136. if err != nil {
  137. panic(err)
  138. }
  139. provider = p
  140. default:
  141. panic(errors.New("Invalid mirror provider"))
  142. }
  143. // Add Logging Hook
  144. provider.AddHook(newLogLimiter(provider))
  145. // Add Cgroup Hook
  146. if cfg.Cgroup.Enable {
  147. provider.AddHook(
  148. newCgroupHook(provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group),
  149. )
  150. }
  151. // ExecOnSuccess hook
  152. if mirror.ExecOnSuccess != "" {
  153. h, err := newExecPostHook(provider, execOnSuccess, mirror.ExecOnSuccess)
  154. if err != nil {
  155. logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
  156. panic(err)
  157. }
  158. provider.AddHook(h)
  159. }
  160. // ExecOnFailure hook
  161. if mirror.ExecOnFailure != "" {
  162. h, err := newExecPostHook(provider, execOnFailure, mirror.ExecOnFailure)
  163. if err != nil {
  164. logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
  165. panic(err)
  166. }
  167. provider.AddHook(h)
  168. }
  169. return provider
  170. }