provider.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package worker
  2. import (
  3. "bytes"
  4. "errors"
  5. "html/template"
  6. "path/filepath"
  7. "time"
  8. )
  9. // mirror provider is the wrapper of mirror jobs
  10. const (
  11. _WorkingDirKey = "working_dir"
  12. _LogDirKey = "log_dir"
  13. _LogFileKey = "log_file"
  14. )
  15. // A mirrorProvider instance
  16. type mirrorProvider interface {
  17. // name
  18. Name() string
  19. Upstream() string
  20. Type() providerEnum
  21. // run mirror job in background
  22. Run() error
  23. // run mirror job in background
  24. Start() error
  25. // Wait job to finish
  26. Wait() error
  27. // terminate mirror job
  28. Terminate() error
  29. // job hooks
  30. IsRunning() bool
  31. // Cgroup
  32. Cgroup() *cgroupHook
  33. // ZFS
  34. ZFS() *zfsHook
  35. // Docker
  36. Docker() *dockerHook
  37. AddHook(hook jobHook)
  38. Hooks() []jobHook
  39. Interval() time.Duration
  40. WorkingDir() string
  41. LogDir() string
  42. LogFile() string
  43. IsMaster() bool
  44. DataSize() string
  45. // enter context
  46. EnterContext() *Context
  47. // exit context
  48. ExitContext() *Context
  49. // return context
  50. Context() *Context
  51. }
  52. // newProvider creates a mirrorProvider instance
  53. // using a mirrorCfg and the global cfg
  54. func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
  55. formatLogDir := func(logDir string, m mirrorConfig) string {
  56. tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir)
  57. if err != nil {
  58. panic(err)
  59. }
  60. var formatedLogDir bytes.Buffer
  61. tmpl.Execute(&formatedLogDir, m)
  62. return formatedLogDir.String()
  63. }
  64. logDir := mirror.LogDir
  65. mirrorDir := mirror.MirrorDir
  66. if logDir == "" {
  67. logDir = cfg.Global.LogDir
  68. }
  69. if mirrorDir == "" {
  70. mirrorDir = filepath.Join(
  71. cfg.Global.MirrorDir, mirror.Name,
  72. )
  73. }
  74. if mirror.Interval == 0 {
  75. mirror.Interval = cfg.Global.Interval
  76. }
  77. logDir = formatLogDir(logDir, mirror)
  78. // IsMaster
  79. isMaster := true
  80. if mirror.Role == "slave" {
  81. isMaster = false
  82. } else {
  83. if mirror.Role != "" && mirror.Role != "master" {
  84. logger.Warningf("Invalid role configuration for %s", mirror.Name)
  85. }
  86. }
  87. var provider mirrorProvider
  88. switch mirror.Provider {
  89. case provCommand:
  90. pc := cmdConfig{
  91. name: mirror.Name,
  92. upstreamURL: mirror.Upstream,
  93. command: mirror.Command,
  94. workingDir: mirrorDir,
  95. logDir: logDir,
  96. logFile: filepath.Join(logDir, "latest.log"),
  97. interval: time.Duration(mirror.Interval) * time.Minute,
  98. env: mirror.Env,
  99. }
  100. p, err := newCmdProvider(pc)
  101. p.isMaster = isMaster
  102. if err != nil {
  103. panic(err)
  104. }
  105. provider = p
  106. case provRsync:
  107. rc := rsyncConfig{
  108. name: mirror.Name,
  109. upstreamURL: mirror.Upstream,
  110. rsyncCmd: mirror.Command,
  111. username: mirror.Username,
  112. password: mirror.Password,
  113. excludeFile: mirror.ExcludeFile,
  114. workingDir: mirrorDir,
  115. logDir: logDir,
  116. logFile: filepath.Join(logDir, "latest.log"),
  117. useIPv6: mirror.UseIPv6,
  118. useIPv4: mirror.UseIPv4,
  119. interval: time.Duration(mirror.Interval) * time.Minute,
  120. }
  121. p, err := newRsyncProvider(rc)
  122. p.isMaster = isMaster
  123. if err != nil {
  124. panic(err)
  125. }
  126. provider = p
  127. case provTwoStageRsync:
  128. rc := twoStageRsyncConfig{
  129. name: mirror.Name,
  130. stage1Profile: mirror.Stage1Profile,
  131. upstreamURL: mirror.Upstream,
  132. rsyncCmd: mirror.Command,
  133. username: mirror.Username,
  134. password: mirror.Password,
  135. excludeFile: mirror.ExcludeFile,
  136. workingDir: mirrorDir,
  137. logDir: logDir,
  138. logFile: filepath.Join(logDir, "latest.log"),
  139. useIPv6: mirror.UseIPv6,
  140. interval: time.Duration(mirror.Interval) * time.Minute,
  141. }
  142. p, err := newTwoStageRsyncProvider(rc)
  143. p.isMaster = isMaster
  144. if err != nil {
  145. panic(err)
  146. }
  147. provider = p
  148. default:
  149. panic(errors.New("Invalid mirror provider"))
  150. }
  151. // Add Logging Hook
  152. provider.AddHook(newLogLimiter(provider))
  153. // Add ZFS Hook
  154. if cfg.ZFS.Enable {
  155. provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool))
  156. }
  157. // Add Docker Hook
  158. if cfg.Docker.Enable && len(mirror.DockerImage) > 0 {
  159. provider.AddHook(newDockerHook(provider, cfg.Docker, mirror))
  160. } else if cfg.Cgroup.Enable {
  161. // Add Cgroup Hook
  162. provider.AddHook(
  163. newCgroupHook(
  164. provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group,
  165. cfg.Cgroup.Subsystem, mirror.MemoryLimit,
  166. ),
  167. )
  168. }
  169. addHookFromCmdList := func(cmdList []string, execOn uint8) {
  170. if execOn != execOnSuccess && execOn != execOnFailure {
  171. panic("Invalid option for exec-on")
  172. }
  173. for _, cmd := range cmdList {
  174. h, err := newExecPostHook(provider, execOn, cmd)
  175. if err != nil {
  176. logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
  177. panic(err)
  178. }
  179. provider.AddHook(h)
  180. }
  181. }
  182. // ExecOnSuccess hook
  183. if len(mirror.ExecOnSuccess) > 0 {
  184. addHookFromCmdList(mirror.ExecOnSuccess, execOnSuccess)
  185. } else {
  186. addHookFromCmdList(cfg.Global.ExecOnSuccess, execOnSuccess)
  187. }
  188. addHookFromCmdList(mirror.ExecOnSuccessExtra, execOnSuccess)
  189. // ExecOnFailure hook
  190. if len(mirror.ExecOnFailure) > 0 {
  191. addHookFromCmdList(mirror.ExecOnFailure, execOnFailure)
  192. } else {
  193. addHookFromCmdList(cfg.Global.ExecOnFailure, execOnFailure)
  194. }
  195. addHookFromCmdList(mirror.ExecOnFailureExtra, execOnFailure)
  196. return provider
  197. }