2
0

provider.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package worker
  2. import (
  3. "bytes"
  4. "errors"
  5. "html/template"
  6. "path/filepath"
  7. "time"
  8. )
  9. // mirror provider is the wrapper of mirror jobs
  10. const (
  11. _WorkingDirKey = "working_dir"
  12. _LogDirKey = "log_dir"
  13. _LogFileKey = "log_file"
  14. )
  15. // A mirrorProvider instance
  16. type mirrorProvider interface {
  17. // name
  18. Name() string
  19. Upstream() string
  20. Type() providerEnum
  21. // run mirror job in background
  22. Run() error
  23. // run mirror job in background
  24. Start() error
  25. // Wait job to finish
  26. Wait() error
  27. // terminate mirror job
  28. Terminate() error
  29. // job hooks
  30. IsRunning() bool
  31. // Cgroup
  32. Cgroup() *cgroupHook
  33. // ZFS
  34. ZFS() *zfsHook
  35. AddHook(hook jobHook)
  36. Hooks() []jobHook
  37. Interval() time.Duration
  38. WorkingDir() string
  39. LogDir() string
  40. LogFile() string
  41. IsMaster() bool
  42. // enter context
  43. EnterContext() *Context
  44. // exit context
  45. ExitContext() *Context
  46. // return context
  47. Context() *Context
  48. }
  49. // newProvider creates a mirrorProvider instance
  50. // using a mirrorCfg and the global cfg
  51. func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
  52. formatLogDir := func(logDir string, m mirrorConfig) string {
  53. tmpl, err := template.New("logDirTmpl-" + m.Name).Parse(logDir)
  54. if err != nil {
  55. panic(err)
  56. }
  57. var formatedLogDir bytes.Buffer
  58. tmpl.Execute(&formatedLogDir, m)
  59. return formatedLogDir.String()
  60. }
  61. logDir := mirror.LogDir
  62. mirrorDir := mirror.MirrorDir
  63. if logDir == "" {
  64. logDir = cfg.Global.LogDir
  65. }
  66. if mirrorDir == "" {
  67. mirrorDir = filepath.Join(
  68. cfg.Global.MirrorDir, mirror.Name,
  69. )
  70. }
  71. if mirror.Interval == 0 {
  72. mirror.Interval = cfg.Global.Interval
  73. }
  74. logDir = formatLogDir(logDir, mirror)
  75. // IsMaster
  76. isMaster := true
  77. if mirror.Role == "slave" {
  78. isMaster = false
  79. } else {
  80. if mirror.Role != "" && mirror.Role != "master" {
  81. logger.Warningf("Invalid role configuration for %s", mirror.Name)
  82. }
  83. }
  84. var provider mirrorProvider
  85. switch mirror.Provider {
  86. case provCommand:
  87. pc := cmdConfig{
  88. name: mirror.Name,
  89. upstreamURL: mirror.Upstream,
  90. command: mirror.Command,
  91. workingDir: mirrorDir,
  92. logDir: logDir,
  93. logFile: filepath.Join(logDir, "latest.log"),
  94. interval: time.Duration(mirror.Interval) * time.Minute,
  95. env: mirror.Env,
  96. }
  97. p, err := newCmdProvider(pc)
  98. p.isMaster = isMaster
  99. if err != nil {
  100. panic(err)
  101. }
  102. provider = p
  103. case provRsync:
  104. rc := rsyncConfig{
  105. name: mirror.Name,
  106. upstreamURL: mirror.Upstream,
  107. rsyncCmd: mirror.Command,
  108. username: mirror.Username,
  109. password: mirror.Password,
  110. excludeFile: mirror.ExcludeFile,
  111. workingDir: mirrorDir,
  112. logDir: logDir,
  113. logFile: filepath.Join(logDir, "latest.log"),
  114. useIPv6: mirror.UseIPv6,
  115. interval: time.Duration(mirror.Interval) * time.Minute,
  116. }
  117. p, err := newRsyncProvider(rc)
  118. p.isMaster = isMaster
  119. if err != nil {
  120. panic(err)
  121. }
  122. provider = p
  123. case provTwoStageRsync:
  124. rc := twoStageRsyncConfig{
  125. name: mirror.Name,
  126. stage1Profile: mirror.Stage1Profile,
  127. upstreamURL: mirror.Upstream,
  128. rsyncCmd: mirror.Command,
  129. username: mirror.Username,
  130. password: mirror.Password,
  131. excludeFile: mirror.ExcludeFile,
  132. workingDir: mirrorDir,
  133. logDir: logDir,
  134. logFile: filepath.Join(logDir, "latest.log"),
  135. useIPv6: mirror.UseIPv6,
  136. interval: time.Duration(mirror.Interval) * time.Minute,
  137. }
  138. p, err := newTwoStageRsyncProvider(rc)
  139. p.isMaster = isMaster
  140. if err != nil {
  141. panic(err)
  142. }
  143. provider = p
  144. default:
  145. panic(errors.New("Invalid mirror provider"))
  146. }
  147. // Add Logging Hook
  148. provider.AddHook(newLogLimiter(provider))
  149. // Add ZFS Hook
  150. if cfg.ZFS.Enable {
  151. provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool))
  152. }
  153. // Add Cgroup Hook
  154. if cfg.Cgroup.Enable {
  155. provider.AddHook(
  156. newCgroupHook(provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group),
  157. )
  158. }
  159. addHookFromCmdList := func(cmdList []string, execOn uint8) {
  160. if execOn != execOnSuccess && execOn != execOnFailure {
  161. panic("Invalid option for exec-on")
  162. }
  163. for _, cmd := range cmdList {
  164. h, err := newExecPostHook(provider, execOn, cmd)
  165. if err != nil {
  166. logger.Errorf("Error initializing mirror %s: %s", mirror.Name, err.Error())
  167. panic(err)
  168. }
  169. provider.AddHook(h)
  170. }
  171. }
  172. // ExecOnSuccess hook
  173. if len(mirror.ExecOnSuccess) > 0 {
  174. addHookFromCmdList(mirror.ExecOnSuccess, execOnSuccess)
  175. } else {
  176. addHookFromCmdList(cfg.Global.ExecOnSuccess, execOnSuccess)
  177. }
  178. addHookFromCmdList(mirror.ExecOnSuccessExtra, execOnSuccess)
  179. // ExecOnFailure hook
  180. if len(mirror.ExecOnFailure) > 0 {
  181. addHookFromCmdList(mirror.ExecOnFailure, execOnFailure)
  182. } else {
  183. addHookFromCmdList(cfg.Global.ExecOnFailure, execOnFailure)
  184. }
  185. addHookFromCmdList(mirror.ExecOnFailureExtra, execOnFailure)
  186. return provider
  187. }