123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- package worker
- import (
- "errors"
- "fmt"
- "regexp"
- "time"
- "github.com/anmitsu/go-shlex"
- "github.com/tuna/tunasync/internal"
- )
- type cmdConfig struct {
- name string
- upstreamURL, command string
- workingDir, logDir, logFile string
- interval time.Duration
- retry int
- env map[string]string
- failOnMatch string
- sizePattern string
- }
- type cmdProvider struct {
- baseProvider
- cmdConfig
- command []string
- dataSize string
- failOnMatch *regexp.Regexp
- sizePattern *regexp.Regexp
- }
- func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
- // TODO: check config options
- if c.retry == 0 {
- c.retry = defaultMaxRetry
- }
- provider := &cmdProvider{
- baseProvider: baseProvider{
- name: c.name,
- ctx: NewContext(),
- interval: c.interval,
- retry: c.retry,
- },
- cmdConfig: c,
- }
- provider.ctx.Set(_WorkingDirKey, c.workingDir)
- provider.ctx.Set(_LogDirKey, c.logDir)
- provider.ctx.Set(_LogFileKey, c.logFile)
- cmd, err := shlex.Split(c.command, true)
- if err != nil {
- return nil, err
- }
- provider.command = cmd
- if len(c.failOnMatch) > 0 {
- var err error
- failOnMatch, err := regexp.Compile(c.failOnMatch)
- if err != nil {
- return nil, errors.New("fail-on-match regexp error: " + err.Error())
- }
- provider.failOnMatch = failOnMatch
- }
- if len(c.sizePattern) > 0 {
- var err error
- sizePattern, err := regexp.Compile(c.sizePattern)
- if err != nil {
- return nil, errors.New("size-pattern regexp error: " + err.Error())
- }
- provider.sizePattern = sizePattern
- }
- return provider, nil
- }
- func (p *cmdProvider) Type() providerEnum {
- return provCommand
- }
- func (p *cmdProvider) Upstream() string {
- return p.upstreamURL
- }
- func (p *cmdProvider) DataSize() string {
- return p.dataSize
- }
- func (p *cmdProvider) Run() error {
- p.dataSize = ""
- if err := p.Start(); err != nil {
- return err
- }
- if err := p.Wait(); err != nil {
- return err
- }
- if p.failOnMatch != nil {
- matches, err := internal.FindAllSubmatchInFile(p.LogFile(), p.failOnMatch)
- fmt.Printf("FindAllSubmatchInFile: %q\n", matches)
- if err != nil {
- return err
- }
- if len(matches) != 0 {
- logger.Debug("Fail-on-match: %r", matches)
- return fmt.Errorf("Fail-on-match regexp found %d matches", len(matches))
- }
- }
- if p.sizePattern != nil {
- p.dataSize = internal.ExtractSizeFromLog(p.LogFile(), p.sizePattern)
- }
- return nil
- }
- func (p *cmdProvider) Start() error {
- p.Lock()
- defer p.Unlock()
- if p.IsRunning() {
- return errors.New("provider is currently running")
- }
- env := map[string]string{
- "TUNASYNC_MIRROR_NAME": p.Name(),
- "TUNASYNC_WORKING_DIR": p.WorkingDir(),
- "TUNASYNC_UPSTREAM_URL": p.upstreamURL,
- "TUNASYNC_LOG_DIR": p.LogDir(),
- "TUNASYNC_LOG_FILE": p.LogFile(),
- }
- for k, v := range p.env {
- env[k] = v
- }
- p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env)
- if err := p.prepareLogFile(false); err != nil {
- return err
- }
- if err := p.cmd.Start(); err != nil {
- return err
- }
- p.isRunning.Store(true)
- return nil
- }
|