Kaynağa Gözat

Merge pull request #54 from tuna/dev

Dev
bigeagle 8 yıl önce
ebeveyn
işleme
81a15e7dd1
6 değiştirilmiş dosya ile 83 ekleme ve 15 silme
  1. 1 1
      manager/db.go
  2. 11 3
      worker/base_provider.go
  3. 10 3
      worker/config.go
  4. 7 0
      worker/provider.go
  5. 9 8
      worker/worker.go
  6. 45 0
      worker/zfs_hook.go

+ 1 - 1
manager/db.go

@@ -182,7 +182,7 @@ func (b *boltAdapter) FlushDisabledJobs() (err error) {
 				err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
 				continue
 			}
-			if m.Status == Disabled {
+			if m.Status == Disabled || len(m.Name) == 0 {
 				err = c.Delete()
 			}
 		}

+ 11 - 3
worker/base_provider.go

@@ -23,6 +23,7 @@ type baseProvider struct {
 	logFile *os.File
 
 	cgroup *cgroupHook
+	zfs    *zfsHook
 	hooks  []jobHook
 }
 
@@ -77,12 +78,15 @@ func (p *baseProvider) LogFile() string {
 			return s
 		}
 	}
-	panic("log dir is impossible to be unavailable")
+	panic("log file is impossible to be unavailable")
 }
 
 func (p *baseProvider) AddHook(hook jobHook) {
-	if cg, ok := hook.(*cgroupHook); ok {
-		p.cgroup = cg
+	switch v := hook.(type) {
+	case *cgroupHook:
+		p.cgroup = v
+	case *zfsHook:
+		p.zfs = v
 	}
 	p.hooks = append(p.hooks, hook)
 }
@@ -95,6 +99,10 @@ func (p *baseProvider) Cgroup() *cgroupHook {
 	return p.cgroup
 }
 
+func (p *baseProvider) ZFS() *zfsHook {
+	return p.zfs
+}
+
 func (p *baseProvider) prepareLogFile() error {
 	if p.LogFile() == "/dev/null" {
 		p.cmd.SetLogFile(nil)

+ 10 - 3
worker/config.go

@@ -37,6 +37,7 @@ type Config struct {
 	Manager managerConfig  `toml:"manager"`
 	Server  serverConfig   `toml:"server"`
 	Cgroup  cgroupConfig   `toml:"cgroup"`
+	ZFS     zfsConfig      `toml:"zfs"`
 	Include includeConfig  `toml:"include"`
 	Mirrors []mirrorConfig `toml:"mirrors"`
 }
@@ -53,9 +54,10 @@ type globalConfig struct {
 }
 
 type managerConfig struct {
-	APIBase string `toml:"api_base"`
-	CACert  string `toml:"ca_cert"`
-	Token   string `toml:"token"`
+	APIBase         string   `toml:"api_base"`
+	CACert          string   `toml:"ca_cert"`
+	ExtraStatusAPIs []string `toml:"extra_status_managers"`
+	// Token   string `toml:"token"`
 }
 
 type serverConfig struct {
@@ -72,6 +74,11 @@ type cgroupConfig struct {
 	Group    string `toml:"group"`
 }
 
+type zfsConfig struct {
+	Enable bool   `toml:"enable"`
+	Zpool  string `toml:"zpool"`
+}
+
 type includeConfig struct {
 	IncludeMirrors string `toml:"include_mirrors"`
 }

+ 7 - 0
worker/provider.go

@@ -36,6 +36,8 @@ type mirrorProvider interface {
 	IsRunning() bool
 	// Cgroup
 	Cgroup() *cgroupHook
+	// ZFS
+	ZFS() *zfsHook
 
 	AddHook(hook jobHook)
 	Hooks() []jobHook
@@ -162,6 +164,11 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
 	// Add Logging Hook
 	provider.AddHook(newLogLimiter(provider))
 
+	// Add ZFS Hook
+	if cfg.ZFS.Enable {
+		provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool))
+	}
+
 	// Add Cgroup Hook
 	if cfg.Cgroup.Enable {
 		provider.AddHook(

+ 9 - 8
worker/worker.go

@@ -405,12 +405,6 @@ func (w *Worker) registorWorker() {
 }
 
 func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
-	url := fmt.Sprintf(
-		"%s/workers/%s/jobs/%s",
-		w.cfg.Manager.APIBase,
-		w.Name(),
-		jobMsg.name,
-	)
 	p := job.provider
 	smsg := MirrorStatus{
 		Name:     jobMsg.name,
@@ -422,8 +416,15 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
 		ErrorMsg: jobMsg.msg,
 	}
 
-	if _, err := PostJSON(url, smsg, w.httpClient); err != nil {
-		logger.Errorf("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error())
+	apiBases := []string{w.cfg.Manager.APIBase}
+	apiBases = append(apiBases, w.cfg.Manager.ExtraStatusAPIs...)
+	for _, root := range apiBases {
+		url := fmt.Sprintf(
+			"%s/workers/%s/jobs/%s", root, w.Name(), jobMsg.name,
+		)
+		if _, err := PostJSON(url, smsg, w.httpClient); err != nil {
+			logger.Errorf("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error())
+		}
 	}
 }
 

+ 45 - 0
worker/zfs_hook.go

@@ -0,0 +1,45 @@
+package worker
+
+import (
+	"fmt"
+	"os"
+	"strings"
+
+	"github.com/codeskyblue/go-sh"
+)
+
+type zfsHook struct {
+	emptyHook
+	provider mirrorProvider
+	zpool    string
+}
+
+func newZfsHook(provider mirrorProvider, zpool string) *zfsHook {
+	return &zfsHook{
+		provider: provider,
+		zpool:    zpool,
+	}
+}
+
+// create zfs dataset for a new mirror
+func (z *zfsHook) preJob() error {
+	workingDir := z.provider.WorkingDir()
+	if _, err := os.Stat(workingDir); os.IsNotExist(err) {
+		// sudo zfs create $zfsDataset
+		// sudo zfs set mountpoint=${absPath} ${zfsDataset}
+
+		zfsDataset := fmt.Sprintf("%s/%s", z.zpool, z.provider.Name())
+		// Unknown issue of ZFS:
+		// dataset name should not contain upper case letters
+		zfsDataset = strings.ToLower(zfsDataset)
+		logger.Infof("Creating ZFS dataset %s", zfsDataset)
+		if err := sh.Command("sudo", "zfs", "create", zfsDataset).Run(); err != nil {
+			return err
+		}
+		logger.Infof("Mount ZFS dataset %s to %s", zfsDataset, workingDir)
+		if err := sh.Command("sudo", "zfs", "set", "mountpoint="+workingDir, zfsDataset).Run(); err != nil {
+			return err
+		}
+	}
+	return nil
+}