provider.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package worker
  2. import (
  3. "bytes"
  4. "errors"
  5. "html/template"
  6. "path/filepath"
  7. "time"
  8. )
  9. // mirror provider is the wrapper of mirror jobs
  10. const (
  11. _WorkingDirKey = "working_dir"
  12. _LogDirKey = "log_dir"
  13. _LogFileKey = "log_file"
  14. )
  15. // A mirrorProvider instance
  16. type mirrorProvider interface {
  17. // name
  18. Name() string
  19. Upstream() string
  20. Type() providerEnum
  21. // Start then Wait
  22. Run(started chan empty) error
  23. // Start the job
  24. Start() error
  25. // Wait job to finish
  26. Wait() error
  27. // terminate mirror job
  28. Terminate() error
  29. // job hooks
  30. IsRunning() bool
  31. // Cgroup
  32. Cgroup() *cgroupHook
  33. // ZFS
  34. ZFS() *zfsHook
  35. // Docker
  36. Docker() *dockerHook
  37. AddHook(hook jobHook)
  38. Hooks() []jobHook
  39. Interval() time.Duration
  40. Retry() int
  41. Timeout() time.Duration
  42. WorkingDir() string
  43. LogDir() string
  44. LogFile() string
  45. IsMaster() bool
  46. DataSize() string
  47. // enter context
  48. EnterContext() *Context
  49. // exit context
  50. ExitContext() *Context
  51. // return context
  52. Context() *Context
  53. // set in newMirrorProvider, used by cmdJob.Wait
  54. SetSuccessExitCodes(codes []int)
  55. GetSuccessExitCodes() []int
  56. }
  57. // newProvider creates a mirrorProvider instance
  58. // using a mirrorCfg and the global cfg
  59. func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
  60. formatLogDir := func(logDir string, m mirrorConfig) string {
  61. tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir)
  62. if err != nil {
  63. panic(err)
  64. }
  65. var formatedLogDir bytes.Buffer
  66. tmpl.Execute(&formatedLogDir, m)
  67. return formatedLogDir.String()
  68. }
  69. logDir := mirror.LogDir
  70. mirrorDir := mirror.MirrorDir
  71. if logDir == "" {
  72. logDir = cfg.Global.LogDir
  73. }
  74. if mirrorDir == "" {
  75. mirrorDir = filepath.Join(
  76. cfg.Global.MirrorDir, mirror.MirrorSubDir, mirror.Name,
  77. )
  78. }
  79. if mirror.Interval == 0 {
  80. mirror.Interval = cfg.Global.Interval
  81. }
  82. if mirror.Retry == 0 {
  83. mirror.Retry = cfg.Global.Retry
  84. }
  85. if mirror.Timeout == 0 {
  86. mirror.Timeout = cfg.Global.Timeout
  87. }
  88. logDir = formatLogDir(logDir, mirror)
  89. // IsMaster
  90. isMaster := true
  91. if mirror.Role == "slave" {
  92. isMaster = false
  93. } else {
  94. if mirror.Role != "" && mirror.Role != "master" {
  95. logger.Warningf("Invalid role configuration for %s", mirror.Name)
  96. }
  97. }
  98. var provider mirrorProvider
  99. switch mirror.Provider {
  100. case provCommand:
  101. pc := cmdConfig{
  102. name: mirror.Name,
  103. upstreamURL: mirror.Upstream,
  104. command: mirror.Command,
  105. workingDir: mirrorDir,
  106. failOnMatch: mirror.FailOnMatch,
  107. sizePattern: mirror.SizePattern,
  108. logDir: logDir,
  109. logFile: filepath.Join(logDir, "latest.log"),
  110. interval: time.Duration(mirror.Interval) * time.Minute,
  111. retry: mirror.Retry,
  112. timeout: time.Duration(mirror.Timeout) * time.Second,
  113. env: mirror.Env,
  114. }
  115. p, err := newCmdProvider(pc)
  116. if err != nil {
  117. panic(err)
  118. }
  119. p.isMaster = isMaster
  120. provider = p
  121. case provRsync:
  122. rc := rsyncConfig{
  123. name: mirror.Name,
  124. upstreamURL: mirror.Upstream,
  125. rsyncCmd: mirror.Command,
  126. username: mirror.Username,
  127. password: mirror.Password,
  128. excludeFile: mirror.ExcludeFile,
  129. extraOptions: mirror.RsyncOptions,
  130. rsyncNeverTimeout: mirror.RsyncNoTimeo,
  131. rsyncTimeoutValue: mirror.RsyncTimeout,
  132. globalOptions: cfg.Global.RsyncOptions,
  133. overriddenOptions: mirror.RsyncOverride,
  134. useOverrideOnly: mirror.RsyncOverrideOnly,
  135. rsyncEnv: mirror.Env,
  136. workingDir: mirrorDir,
  137. logDir: logDir,
  138. logFile: filepath.Join(logDir, "latest.log"),
  139. useIPv6: mirror.UseIPv6,
  140. useIPv4: mirror.UseIPv4,
  141. interval: time.Duration(mirror.Interval) * time.Minute,
  142. retry: mirror.Retry,
  143. timeout: time.Duration(mirror.Timeout) * time.Second,
  144. }
  145. p, err := newRsyncProvider(rc)
  146. if err != nil {
  147. panic(err)
  148. }
  149. p.isMaster = isMaster
  150. provider = p
  151. case provTwoStageRsync:
  152. rc := twoStageRsyncConfig{
  153. name: mirror.Name,
  154. stage1Profile: mirror.Stage1Profile,
  155. upstreamURL: mirror.Upstream,
  156. rsyncCmd: mirror.Command,
  157. username: mirror.Username,
  158. password: mirror.Password,
  159. excludeFile: mirror.ExcludeFile,
  160. extraOptions: mirror.RsyncOptions,
  161. rsyncNeverTimeout: mirror.RsyncNoTimeo,
  162. rsyncTimeoutValue: mirror.RsyncTimeout,
  163. rsyncEnv: mirror.Env,
  164. workingDir: mirrorDir,
  165. logDir: logDir,
  166. logFile: filepath.Join(logDir, "latest.log"),
  167. useIPv6: mirror.UseIPv6,
  168. useIPv4: mirror.UseIPv4,
  169. interval: time.Duration(mirror.Interval) * time.Minute,
  170. retry: mirror.Retry,
  171. timeout: time.Duration(mirror.Timeout) * time.Second,
  172. }
  173. p, err := newTwoStageRsyncProvider(rc)
  174. if err != nil {
  175. panic(err)
  176. }
  177. p.isMaster = isMaster
  178. provider = p
  179. default:
  180. panic(errors.New("Invalid mirror provider"))
  181. }
  182. // Add Logging Hook
  183. provider.AddHook(newLogLimiter(provider))
  184. // Add ZFS Hook
  185. if cfg.ZFS.Enable {
  186. provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool))
  187. }
  188. // Add Btrfs Snapshot Hook
  189. if cfg.BtrfsSnapshot.Enable {
  190. provider.AddHook(newBtrfsSnapshotHook(provider, cfg.BtrfsSnapshot.SnapshotPath, mirror))
  191. }
  192. // Add Docker Hook
  193. if cfg.Docker.Enable && len(mirror.DockerImage) > 0 {
  194. provider.AddHook(newDockerHook(provider, cfg.Docker, mirror))
  195. } else if cfg.Cgroup.Enable {
  196. // Add Cgroup Hook
  197. provider.AddHook(
  198. newCgroupHook(
  199. provider, cfg.Cgroup, mirror.MemoryLimit,
  200. ),
  201. )
  202. }
  203. addHookFromCmdList := func(cmdList []string, execOn uint8) {
  204. if execOn != execOnSuccess && execOn != execOnFailure {
  205. panic("Invalid option for exec-on")
  206. }
  207. for _, cmd := range cmdList {
  208. h, err := newExecPostHook(provider, execOn, cmd)
  209. if err != nil {
  210. logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
  211. panic(err)
  212. }
  213. provider.AddHook(h)
  214. }
  215. }
  216. // ExecOnSuccess hook
  217. if len(mirror.ExecOnSuccess) > 0 {
  218. addHookFromCmdList(mirror.ExecOnSuccess, execOnSuccess)
  219. } else {
  220. addHookFromCmdList(cfg.Global.ExecOnSuccess, execOnSuccess)
  221. }
  222. addHookFromCmdList(mirror.ExecOnSuccessExtra, execOnSuccess)
  223. // ExecOnFailure hook
  224. if len(mirror.ExecOnFailure) > 0 {
  225. addHookFromCmdList(mirror.ExecOnFailure, execOnFailure)
  226. } else {
  227. addHookFromCmdList(cfg.Global.ExecOnFailure, execOnFailure)
  228. }
  229. addHookFromCmdList(mirror.ExecOnFailureExtra, execOnFailure)
  230. successExitCodes := []int{}
  231. if cfg.Global.SuccessExitCodes != nil {
  232. successExitCodes = append(successExitCodes, cfg.Global.SuccessExitCodes...)
  233. }
  234. if mirror.SuccessExitCodes != nil {
  235. successExitCodes = append(successExitCodes, mirror.SuccessExitCodes...)
  236. }
  237. if len(successExitCodes) > 0 {
  238. logger.Infof("Non-zero success exit codes set for mirror %s: %v", mirror.Name, successExitCodes)
  239. provider.SetSuccessExitCodes(successExitCodes)
  240. }
  241. return provider
  242. }