provider.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package worker
  2. import (
  3. "bytes"
  4. "errors"
  5. "html/template"
  6. "path/filepath"
  7. "time"
  8. )
  9. // mirror provider is the wrapper of mirror jobs
  10. const (
  11. _WorkingDirKey = "working_dir"
  12. _LogDirKey = "log_dir"
  13. _LogFileKey = "log_file"
  14. )
  15. // A mirrorProvider instance
  16. type mirrorProvider interface {
  17. // name
  18. Name() string
  19. Upstream() string
  20. Type() providerEnum
  21. // Start then Wait
  22. Run(started chan empty) error
  23. // Start the job
  24. Start() error
  25. // Wait job to finish
  26. Wait() error
  27. // terminate mirror job
  28. Terminate() error
  29. // job hooks
  30. IsRunning() bool
  31. // Cgroup
  32. Cgroup() *cgroupHook
  33. // ZFS
  34. ZFS() *zfsHook
  35. // Docker
  36. Docker() *dockerHook
  37. AddHook(hook jobHook)
  38. Hooks() []jobHook
  39. Interval() time.Duration
  40. Retry() int
  41. Timeout() time.Duration
  42. WorkingDir() string
  43. LogDir() string
  44. LogFile() string
  45. IsMaster() bool
  46. DataSize() string
  47. // enter context
  48. EnterContext() *Context
  49. // exit context
  50. ExitContext() *Context
  51. // return context
  52. Context() *Context
  53. }
  54. // newProvider creates a mirrorProvider instance
  55. // using a mirrorCfg and the global cfg
  56. func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
  57. formatLogDir := func(logDir string, m mirrorConfig) string {
  58. tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir)
  59. if err != nil {
  60. panic(err)
  61. }
  62. var formatedLogDir bytes.Buffer
  63. tmpl.Execute(&formatedLogDir, m)
  64. return formatedLogDir.String()
  65. }
  66. logDir := mirror.LogDir
  67. mirrorDir := mirror.MirrorDir
  68. if logDir == "" {
  69. logDir = cfg.Global.LogDir
  70. }
  71. if mirrorDir == "" {
  72. mirrorDir = filepath.Join(
  73. cfg.Global.MirrorDir, mirror.MirrorSubDir, mirror.Name,
  74. )
  75. }
  76. if mirror.Interval == 0 {
  77. mirror.Interval = cfg.Global.Interval
  78. }
  79. if mirror.Retry == 0 {
  80. mirror.Retry = cfg.Global.Retry
  81. }
  82. if mirror.Timeout == 0 {
  83. mirror.Timeout = cfg.Global.Timeout
  84. }
  85. logDir = formatLogDir(logDir, mirror)
  86. // IsMaster
  87. isMaster := true
  88. if mirror.Role == "slave" {
  89. isMaster = false
  90. } else {
  91. if mirror.Role != "" && mirror.Role != "master" {
  92. logger.Warningf("Invalid role configuration for %s", mirror.Name)
  93. }
  94. }
  95. var provider mirrorProvider
  96. switch mirror.Provider {
  97. case provCommand:
  98. pc := cmdConfig{
  99. name: mirror.Name,
  100. upstreamURL: mirror.Upstream,
  101. command: mirror.Command,
  102. workingDir: mirrorDir,
  103. failOnMatch: mirror.FailOnMatch,
  104. sizePattern: mirror.SizePattern,
  105. logDir: logDir,
  106. logFile: filepath.Join(logDir, "latest.log"),
  107. interval: time.Duration(mirror.Interval) * time.Minute,
  108. retry: mirror.Retry,
  109. timeout: time.Duration(mirror.Timeout) * time.Second,
  110. env: mirror.Env,
  111. }
  112. p, err := newCmdProvider(pc)
  113. if err != nil {
  114. panic(err)
  115. }
  116. p.isMaster = isMaster
  117. provider = p
  118. case provRsync:
  119. rc := rsyncConfig{
  120. name: mirror.Name,
  121. upstreamURL: mirror.Upstream,
  122. rsyncCmd: mirror.Command,
  123. username: mirror.Username,
  124. password: mirror.Password,
  125. excludeFile: mirror.ExcludeFile,
  126. extraOptions: mirror.RsyncOptions,
  127. rsyncNeverTimeout: mirror.RsyncNoTimeo,
  128. rsyncTimeoutValue: mirror.RsyncTimeout,
  129. globalOptions: cfg.Global.RsyncOptions,
  130. overriddenOptions: mirror.RsyncOverride,
  131. useOverrideOnly: mirror.RsyncOverrideOnly,
  132. rsyncEnv: mirror.Env,
  133. workingDir: mirrorDir,
  134. logDir: logDir,
  135. logFile: filepath.Join(logDir, "latest.log"),
  136. useIPv6: mirror.UseIPv6,
  137. useIPv4: mirror.UseIPv4,
  138. interval: time.Duration(mirror.Interval) * time.Minute,
  139. retry: mirror.Retry,
  140. timeout: time.Duration(mirror.Timeout) * time.Second,
  141. }
  142. p, err := newRsyncProvider(rc)
  143. if err != nil {
  144. panic(err)
  145. }
  146. p.isMaster = isMaster
  147. provider = p
  148. case provTwoStageRsync:
  149. rc := twoStageRsyncConfig{
  150. name: mirror.Name,
  151. stage1Profile: mirror.Stage1Profile,
  152. upstreamURL: mirror.Upstream,
  153. rsyncCmd: mirror.Command,
  154. username: mirror.Username,
  155. password: mirror.Password,
  156. excludeFile: mirror.ExcludeFile,
  157. extraOptions: mirror.RsyncOptions,
  158. rsyncNeverTimeout: mirror.RsyncNoTimeo,
  159. rsyncTimeoutValue: mirror.RsyncTimeout,
  160. rsyncEnv: mirror.Env,
  161. workingDir: mirrorDir,
  162. logDir: logDir,
  163. logFile: filepath.Join(logDir, "latest.log"),
  164. useIPv6: mirror.UseIPv6,
  165. useIPv4: mirror.UseIPv4,
  166. interval: time.Duration(mirror.Interval) * time.Minute,
  167. retry: mirror.Retry,
  168. timeout: time.Duration(mirror.Timeout) * time.Second,
  169. }
  170. p, err := newTwoStageRsyncProvider(rc)
  171. if err != nil {
  172. panic(err)
  173. }
  174. p.isMaster = isMaster
  175. provider = p
  176. default:
  177. panic(errors.New("Invalid mirror provider"))
  178. }
  179. // Add Logging Hook
  180. provider.AddHook(newLogLimiter(provider))
  181. // Add ZFS Hook
  182. if cfg.ZFS.Enable {
  183. provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool))
  184. }
  185. // Add Btrfs Snapshot Hook
  186. if cfg.BtrfsSnapshot.Enable {
  187. provider.AddHook(newBtrfsSnapshotHook(provider, cfg.BtrfsSnapshot.SnapshotPath, mirror))
  188. }
  189. // Add Docker Hook
  190. if cfg.Docker.Enable && len(mirror.DockerImage) > 0 {
  191. provider.AddHook(newDockerHook(provider, cfg.Docker, mirror))
  192. } else if cfg.Cgroup.Enable {
  193. // Add Cgroup Hook
  194. provider.AddHook(
  195. newCgroupHook(
  196. provider, cfg.Cgroup, mirror.MemoryLimit,
  197. ),
  198. )
  199. }
  200. addHookFromCmdList := func(cmdList []string, execOn uint8) {
  201. if execOn != execOnSuccess && execOn != execOnFailure {
  202. panic("Invalid option for exec-on")
  203. }
  204. for _, cmd := range cmdList {
  205. h, err := newExecPostHook(provider, execOn, cmd)
  206. if err != nil {
  207. logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
  208. panic(err)
  209. }
  210. provider.AddHook(h)
  211. }
  212. }
  213. // ExecOnSuccess hook
  214. if len(mirror.ExecOnSuccess) > 0 {
  215. addHookFromCmdList(mirror.ExecOnSuccess, execOnSuccess)
  216. } else {
  217. addHookFromCmdList(cfg.Global.ExecOnSuccess, execOnSuccess)
  218. }
  219. addHookFromCmdList(mirror.ExecOnSuccessExtra, execOnSuccess)
  220. // ExecOnFailure hook
  221. if len(mirror.ExecOnFailure) > 0 {
  222. addHookFromCmdList(mirror.ExecOnFailure, execOnFailure)
  223. } else {
  224. addHookFromCmdList(cfg.Global.ExecOnFailure, execOnFailure)
  225. }
  226. addHookFromCmdList(mirror.ExecOnFailureExtra, execOnFailure)
  227. return provider
  228. }