2
0

provider.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package worker
  2. import (
  3. "bytes"
  4. "errors"
  5. "html/template"
  6. "path/filepath"
  7. "time"
  8. )
  9. // mirror provider is the wrapper of mirror jobs
  10. const (
  11. _WorkingDirKey = "working_dir"
  12. _LogDirKey = "log_dir"
  13. _LogFileKey = "log_file"
  14. )
  15. // A mirrorProvider instance
  16. type mirrorProvider interface {
  17. // name
  18. Name() string
  19. Upstream() string
  20. Type() providerEnum
  21. // run mirror job in background
  22. Run() error
  23. // run mirror job in background
  24. Start() error
  25. // Wait job to finish
  26. Wait() error
  27. // terminate mirror job
  28. Terminate() error
  29. // job hooks
  30. IsRunning() bool
  31. // Cgroup
  32. Cgroup() *cgroupHook
  33. // ZFS
  34. ZFS() *zfsHook
  35. // Docker
  36. Docker() *dockerHook
  37. AddHook(hook jobHook)
  38. Hooks() []jobHook
  39. Interval() time.Duration
  40. Retry() int
  41. WorkingDir() string
  42. LogDir() string
  43. LogFile() string
  44. IsMaster() bool
  45. DataSize() string
  46. // enter context
  47. EnterContext() *Context
  48. // exit context
  49. ExitContext() *Context
  50. // return context
  51. Context() *Context
  52. }
  53. // newProvider creates a mirrorProvider instance
  54. // using a mirrorCfg and the global cfg
  55. func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
  56. formatLogDir := func(logDir string, m mirrorConfig) string {
  57. tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir)
  58. if err != nil {
  59. panic(err)
  60. }
  61. var formatedLogDir bytes.Buffer
  62. tmpl.Execute(&formatedLogDir, m)
  63. return formatedLogDir.String()
  64. }
  65. logDir := mirror.LogDir
  66. mirrorDir := mirror.MirrorDir
  67. if logDir == "" {
  68. logDir = cfg.Global.LogDir
  69. }
  70. if mirrorDir == "" {
  71. mirrorDir = filepath.Join(
  72. cfg.Global.MirrorDir, mirror.MirrorSubDir, mirror.Name,
  73. )
  74. }
  75. if mirror.Interval == 0 {
  76. mirror.Interval = cfg.Global.Interval
  77. }
  78. if mirror.Retry == 0 {
  79. mirror.Retry = cfg.Global.Retry
  80. }
  81. logDir = formatLogDir(logDir, mirror)
  82. // IsMaster
  83. isMaster := true
  84. if mirror.Role == "slave" {
  85. isMaster = false
  86. } else {
  87. if mirror.Role != "" && mirror.Role != "master" {
  88. logger.Warningf("Invalid role configuration for %s", mirror.Name)
  89. }
  90. }
  91. var provider mirrorProvider
  92. switch mirror.Provider {
  93. case provCommand:
  94. pc := cmdConfig{
  95. name: mirror.Name,
  96. upstreamURL: mirror.Upstream,
  97. command: mirror.Command,
  98. workingDir: mirrorDir,
  99. failOnMatch: mirror.FailOnMatch,
  100. sizePattern: mirror.SizePattern,
  101. logDir: logDir,
  102. logFile: filepath.Join(logDir, "latest.log"),
  103. interval: time.Duration(mirror.Interval) * time.Minute,
  104. retry: mirror.Retry,
  105. env: mirror.Env,
  106. }
  107. p, err := newCmdProvider(pc)
  108. if err != nil {
  109. panic(err)
  110. }
  111. p.isMaster = isMaster
  112. provider = p
  113. case provRsync:
  114. rc := rsyncConfig{
  115. name: mirror.Name,
  116. upstreamURL: mirror.Upstream,
  117. rsyncCmd: mirror.Command,
  118. username: mirror.Username,
  119. password: mirror.Password,
  120. excludeFile: mirror.ExcludeFile,
  121. extraOptions: mirror.RsyncOptions,
  122. overriddenOptions: mirror.RsyncOverride,
  123. rsyncEnv: mirror.Env,
  124. workingDir: mirrorDir,
  125. logDir: logDir,
  126. logFile: filepath.Join(logDir, "latest.log"),
  127. useIPv6: mirror.UseIPv6,
  128. useIPv4: mirror.UseIPv4,
  129. interval: time.Duration(mirror.Interval) * time.Minute,
  130. retry: mirror.Retry,
  131. }
  132. p, err := newRsyncProvider(rc)
  133. if err != nil {
  134. panic(err)
  135. }
  136. p.isMaster = isMaster
  137. provider = p
  138. case provTwoStageRsync:
  139. rc := twoStageRsyncConfig{
  140. name: mirror.Name,
  141. stage1Profile: mirror.Stage1Profile,
  142. upstreamURL: mirror.Upstream,
  143. rsyncCmd: mirror.Command,
  144. username: mirror.Username,
  145. password: mirror.Password,
  146. excludeFile: mirror.ExcludeFile,
  147. extraOptions: mirror.RsyncOptions,
  148. rsyncEnv: mirror.Env,
  149. workingDir: mirrorDir,
  150. logDir: logDir,
  151. logFile: filepath.Join(logDir, "latest.log"),
  152. useIPv6: mirror.UseIPv6,
  153. interval: time.Duration(mirror.Interval) * time.Minute,
  154. retry: mirror.Retry,
  155. }
  156. p, err := newTwoStageRsyncProvider(rc)
  157. if err != nil {
  158. panic(err)
  159. }
  160. p.isMaster = isMaster
  161. provider = p
  162. default:
  163. panic(errors.New("Invalid mirror provider"))
  164. }
  165. // Add Logging Hook
  166. provider.AddHook(newLogLimiter(provider))
  167. // Add ZFS Hook
  168. if cfg.ZFS.Enable {
  169. provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool))
  170. }
  171. // Add Btrfs Snapshot Hook
  172. if cfg.BtrfsSnapshot.Enable {
  173. provider.AddHook(newBtrfsSnapshotHook(provider, cfg.BtrfsSnapshot.SnapshotPath, mirror))
  174. }
  175. // Add Docker Hook
  176. if cfg.Docker.Enable && len(mirror.DockerImage) > 0 {
  177. provider.AddHook(newDockerHook(provider, cfg.Docker, mirror))
  178. } else if cfg.Cgroup.Enable {
  179. // Add Cgroup Hook
  180. provider.AddHook(
  181. newCgroupHook(
  182. provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group,
  183. cfg.Cgroup.Subsystem, mirror.MemoryLimit,
  184. ),
  185. )
  186. }
  187. addHookFromCmdList := func(cmdList []string, execOn uint8) {
  188. if execOn != execOnSuccess && execOn != execOnFailure {
  189. panic("Invalid option for exec-on")
  190. }
  191. for _, cmd := range cmdList {
  192. h, err := newExecPostHook(provider, execOn, cmd)
  193. if err != nil {
  194. logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
  195. panic(err)
  196. }
  197. provider.AddHook(h)
  198. }
  199. }
  200. // ExecOnSuccess hook
  201. if len(mirror.ExecOnSuccess) > 0 {
  202. addHookFromCmdList(mirror.ExecOnSuccess, execOnSuccess)
  203. } else {
  204. addHookFromCmdList(cfg.Global.ExecOnSuccess, execOnSuccess)
  205. }
  206. addHookFromCmdList(mirror.ExecOnSuccessExtra, execOnSuccess)
  207. // ExecOnFailure hook
  208. if len(mirror.ExecOnFailure) > 0 {
  209. addHookFromCmdList(mirror.ExecOnFailure, execOnFailure)
  210. } else {
  211. addHookFromCmdList(cfg.Global.ExecOnFailure, execOnFailure)
  212. }
  213. addHookFromCmdList(mirror.ExecOnFailureExtra, execOnFailure)
  214. return provider
  215. }