123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- package worker
- import (
- "os"
- "sync"
- "sync/atomic"
- "time"
- )
- // mirror provider is the wrapper of mirror jobs
- type providerType uint8
- const (
- _WorkingDirKey = "working_dir"
- _LogDirKey = "log_dir"
- _LogFileKey = "log_file"
- )
- // A mirrorProvider instance
- type mirrorProvider interface {
- // name
- Name() string
- Upstream() string
- // run mirror job in background
- Run() error
- // run mirror job in background
- Start() error
- // Wait job to finish
- Wait() error
- // terminate mirror job
- Terminate() error
- // job hooks
- IsRunning() bool
- // Cgroup
- Cgroup() *cgroupHook
- AddHook(hook jobHook)
- Hooks() []jobHook
- Interval() time.Duration
- WorkingDir() string
- LogDir() string
- LogFile() string
- IsMaster() bool
- // enter context
- EnterContext() *Context
- // exit context
- ExitContext() *Context
- // return context
- Context() *Context
- }
- type baseProvider struct {
- sync.Mutex
- ctx *Context
- name string
- interval time.Duration
- isMaster bool
- cmd *cmdJob
- isRunning atomic.Value
- logFile *os.File
- cgroup *cgroupHook
- hooks []jobHook
- }
- func (p *baseProvider) Name() string {
- return p.name
- }
- func (p *baseProvider) EnterContext() *Context {
- p.ctx = p.ctx.Enter()
- return p.ctx
- }
- func (p *baseProvider) ExitContext() *Context {
- p.ctx, _ = p.ctx.Exit()
- return p.ctx
- }
- func (p *baseProvider) Context() *Context {
- return p.ctx
- }
- func (p *baseProvider) Interval() time.Duration {
- // logger.Debug("interval for %s: %v", p.Name(), p.interval)
- return p.interval
- }
- func (p *baseProvider) IsMaster() bool {
- return p.isMaster
- }
- func (p *baseProvider) WorkingDir() string {
- if v, ok := p.ctx.Get(_WorkingDirKey); ok {
- if s, ok := v.(string); ok {
- return s
- }
- }
- panic("working dir is impossible to be non-exist")
- }
- func (p *baseProvider) LogDir() string {
- if v, ok := p.ctx.Get(_LogDirKey); ok {
- if s, ok := v.(string); ok {
- return s
- }
- }
- panic("log dir is impossible to be unavailable")
- }
- func (p *baseProvider) LogFile() string {
- if v, ok := p.ctx.Get(_LogFileKey); ok {
- if s, ok := v.(string); ok {
- return s
- }
- }
- panic("log dir is impossible to be unavailable")
- }
- func (p *baseProvider) AddHook(hook jobHook) {
- if cg, ok := hook.(*cgroupHook); ok {
- p.cgroup = cg
- }
- p.hooks = append(p.hooks, hook)
- }
- func (p *baseProvider) Hooks() []jobHook {
- return p.hooks
- }
- func (p *baseProvider) Cgroup() *cgroupHook {
- return p.cgroup
- }
- func (p *baseProvider) prepareLogFile() error {
- if p.LogFile() == "/dev/null" {
- p.cmd.SetLogFile(nil)
- return nil
- }
- if p.logFile == nil {
- logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
- if err != nil {
- logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
- return err
- }
- p.logFile = logFile
- }
- p.cmd.SetLogFile(p.logFile)
- return nil
- }
- func (p *baseProvider) Run() error {
- panic("Not Implemented")
- }
- func (p *baseProvider) Start() error {
- panic("Not Implemented")
- }
- func (p *baseProvider) IsRunning() bool {
- isRunning, _ := p.isRunning.Load().(bool)
- return isRunning
- }
- func (p *baseProvider) Wait() error {
- defer func() {
- p.Lock()
- p.isRunning.Store(false)
- if p.logFile != nil {
- p.logFile.Close()
- p.logFile = nil
- }
- p.Unlock()
- }()
- return p.cmd.Wait()
- }
- func (p *baseProvider) Terminate() error {
- logger.Debugf("terminating provider: %s", p.Name())
- if !p.IsRunning() {
- return nil
- }
- p.Lock()
- if p.logFile != nil {
- p.logFile.Close()
- p.logFile = nil
- }
- p.Unlock()
- err := p.cmd.Terminate()
- p.isRunning.Store(false)
- return err
- }
|