Преглед на файлове

reload mirror jobs online

bigeagle преди 11 години
родител
ревизия
c80c35bba6
променени са 7 файла, в които са добавени 229 реда и са изтрити 70 реда
  1. 1 1
      README.md
  2. 19 12
      examples/tunasync.ini
  3. 2 1
      tunasync/btrfs_snapshot.py
  4. 13 0
      tunasync/hook.py
  5. 29 2
      tunasync/jobs.py
  6. 17 2
      tunasync/mirror_provider.py
  7. 148 52
      tunasync/tunasync.py

+ 1 - 1
README.md

@@ -5,5 +5,5 @@ tunasync
 
 - [ ] status file
 - [x] btrfs backend (create snapshot before syncing)
-- [ ] add mirror job online
+- [x] add mirror job online
 - [ ] debmirror provider

+ 19 - 12
examples/tunasync.ini

@@ -2,12 +2,13 @@
 log_dir = /var/log/tunasync
 ; mirror_root = /srv/mirror_disk
 mirror_root = /mnt/sdb1/mirror
-use_btrfs = yes
+use_btrfs = no
 local_dir = {mirror_root}/{mirror_name}/_working
 ; maximum numbers of running jobs
-concurrent = 3
+concurrent = 2
 ; interval in minutes
-interval = 120
+interval = 1
+max_retry = 2
 
 [btrfs]
 service_dir = {mirror_root}/{mirror_name}/_current
@@ -15,17 +16,17 @@ working_dir = {mirror_root}/{mirror_name}/_working
 tmp_dir = {mirror_root}/{mirror_name}/_tmp
 
 
+# rmirror:archlinux]
+# provider = rsync
+# upstream = rsync://mirror.us.leaseweb.net/archlinux/
+# log_file = /tmp/archlinux-{date}.log
+# use_ipv6 = yes
+
 [mirror:archlinux]
-provider = rsync
-upstream = rsync://mirror.us.leaseweb.net/archlinux/
+provider = shell
+command = sleep 20
+local_dir = /mnt/sdb1/mirror/archlinux/current/
 log_file = /tmp/archlinux-{date}.log
-use_ipv6 = yes
-
-# [mirror:archlinux]
-# provider = shell
-# command = sleep 10
-# local_dir = /mnt/sdb1/mirror/archlinux/current/
-# log_file = /tmp/archlinux-{date}.log
 
 
 [mirror:arch2]
@@ -40,3 +41,9 @@ provider = shell
 command = ./shell_provider.sh
 log_file = /tmp/arch3-{date}.log
 use_btrfs = no
+
+[mirror:arch4]
+provider = shell
+command = ./shell_provider.sh
+log_file = /tmp/arch4-{date}.log
+use_btrfs = no

+ 2 - 1
tunasync/btrfs_snapshot.py

@@ -2,13 +2,14 @@
 # -*- coding:utf-8 -*-
 import sh
 import os
+from .hook import JobHook
 
 
 class BtrfsVolumeError(Exception):
     pass
 
 
-class BtrfsHook(object):
+class BtrfsHook(JobHook):
 
     def __init__(self, service_dir, working_dir, tmp_dir):
         self.service_dir = service_dir

+ 13 - 0
tunasync/hook.py

@@ -0,0 +1,13 @@
+#!/usr/bin/env python2
+# -*- coding:utf-8 -*-
+
+
+class JobHook(object):
+
+    def before_job(self):
+        raise NotImplementedError("")
+
+    def after_job(self):
+        raise NotImplementedError("")
+
+# vim: ts=4 sw=4 sts=4 expandtab

+ 29 - 2
tunasync/jobs.py

@@ -1,22 +1,49 @@
 #!/usr/bin/env python2
 # -*- coding:utf-8 -*-
+import sys
 import time
+import signal
 
 
-def run_job(sema, provider):
+def run_job(sema, child_q, manager_q, provider):
+    aquired = False
+
+    def before_quit(*args):
+        provider.terminate()
+        if aquired:
+            print("{} release semaphore".format(provider.name))
+            sema.release()
+        sys.exit(0)
+
+    signal.signal(signal.SIGTERM, before_quit)
+
     while 1:
-        sema.acquire(True)
+        try:
+            sema.acquire(True)
+        except:
+            break
+        aquired = True
         print("start syncing {}".format(provider.name))
 
         for hook in provider.hooks:
             hook.before_job()
 
         provider.run()
+        provider.wait()
 
         for hook in provider.hooks[::-1]:
             hook.after_job()
 
         sema.release()
+        aquired = False
+        try:
+            msg = child_q.get(timeout=1)
+            if msg == "terminate":
+                manager_q.put((provider.name, "QUIT"))
+                break
+        except:
+            pass
+
         print("syncing {} finished, sleep {} minutes for the next turn".format(
             provider.name, provider.interval
         ))

+ 17 - 2
tunasync/mirror_provider.py

@@ -17,10 +17,22 @@ class MirrorProvider(object):
         self.log_file = log_file
         self.interval = interval
         self.hooks = hooks
+        self.p = None
 
     def run(self):
         raise NotImplementedError("run method should be implemented")
 
+    def terminate(self):
+        if self.p is not None:
+            self.p.process.terminate()
+            print("{} terminated".format(self.name))
+            self.p = None
+
+    def wait(self):
+        if self.p is not None:
+            self.p.wait()
+            self.p = None
+
 
 class RsyncProvider(MirrorProvider):
 
@@ -60,7 +72,8 @@ class RsyncProvider(MirrorProvider):
         now = datetime.now().strftime("%Y-%m-%d_%H")
         log_file = self.log_file.format(date=now)
 
-        sh.rsync(*_args, _out=log_file, _err=log_file, _out_bufsize=1)
+        self.p = sh.rsync(*_args, _out=log_file, _err=log_file,
+                          _out_bufsize=1, _bg=True)
 
 
 class ShellProvider(MirrorProvider):
@@ -78,6 +91,7 @@ class ShellProvider(MirrorProvider):
         log_file = self.log_file.format(date=now)
 
         new_env = os.environ.copy()
+        new_env["TUNASYNC_MIRROR_NAME"] = self.name
         new_env["TUNASYNC_LOCAL_DIR"] = self.local_dir
         new_env["TUNASYNC_LOG_FILE"] = log_file
 
@@ -85,7 +99,8 @@ class ShellProvider(MirrorProvider):
         _args = [] if len(self.command) == 1 else self.command[1:]
 
         cmd = sh.Command(_cmd)
-        cmd(*_args, _env=new_env, _out=log_file, _err=log_file, _out_bufsize=1)
+        self.p = cmd(*_args, _env=new_env, _out=log_file,
+                     _err=log_file, _out_bufsize=1, _bg=True)
 
 
 # vim: ts=4 sw=4 sts=4 expandtab

+ 148 - 52
tunasync/tunasync.py

@@ -3,8 +3,9 @@
 import ConfigParser
 import os.path
 import signal
+import sys
 
-from multiprocessing import Process, Semaphore
+from multiprocessing import Process, Semaphore, Queue
 from . import jobs
 from .mirror_provider import RsyncProvider, ShellProvider
 from .btrfs_snapshot import BtrfsHook
@@ -61,6 +62,65 @@ class MirrorConfig(object):
         except ConfigParser.NoOptionError:
             self.options["use_btrfs"] = self._parent.use_btrfs
 
+    def __getattr__(self, key):
+        if key in self.__dict__:
+            return self.__dict__[key]
+        else:
+            return self.__dict__["options"].get(key, None)
+
+    def to_provider(self, hooks=[]):
+        if self.provider == "rsync":
+            provider = RsyncProvider(
+                self.name,
+                self.upstream,
+                self.local_dir,
+                self.use_ipv6,
+                self.exclude_file,
+                self.log_file,
+                self.interval,
+                hooks,
+            )
+        elif self.options["provider"] == "shell":
+            provider = ShellProvider(
+                self.name,
+                self.command,
+                self.local_dir,
+                self.log_file,
+                self.interval,
+                hooks
+            )
+
+        return provider
+
+    def compare(self, other):
+        assert self.name == other.name
+
+        for key, val in self.options.iteritems():
+            if other.options.get(key, None) != val:
+                return False
+
+        return True
+
+    def hooks(self):
+        hooks = []
+        parent = self._parent
+        if self.options["use_btrfs"]:
+            working_dir = parent.btrfs_working_dir_tmpl.format(
+                mirror_root=parent.mirror_root,
+                mirror_name=self.name
+            )
+            service_dir = parent.btrfs_service_dir_tmpl.format(
+                mirror_root=parent.mirror_root,
+                mirror_name=self.name
+            )
+            tmp_dir = parent.btrfs_tmp_dir_tmpl.format(
+                mirror_root=parent.mirror_root,
+                mirror_name=self.name
+            )
+            hooks.append(BtrfsHook(service_dir, working_dir, tmp_dir))
+
+        return hooks
+
 
 class TUNASync(object):
 
@@ -75,14 +135,16 @@ class TUNASync(object):
         return cls._instance
 
     def read_config(self, config_file):
+        self._config_file = config_file
         self._settings = ConfigParser.ConfigParser()
         self._settings.read(config_file)
 
         self._inited = True
-        self._mirrors = []
-        self._providers = []
-        self.processes = []
+        self._mirrors = {}
+        self._providers = {}
+        self.processes = {}
         self.semaphore = Semaphore(self._settings.getint("global", "concurrent"))
+        self.channel = Queue()
 
         self.mirror_root = self._settings.get("global", "mirror_root")
         self.use_btrfs = self._settings.getboolean("global", "use_btrfs")
@@ -93,6 +155,9 @@ class TUNASync(object):
         self.btrfs_tmp_dir_tmpl = self._settings.get(
             "btrfs", "tmp_dir")
 
+    def hooks(self):
+        return []
+
     @property
     def mirrors(self):
         if self._mirrors:
@@ -102,8 +167,8 @@ class TUNASync(object):
                               self._settings.sections()):
 
             _, name = section.split(":")
-            self._mirrors.append(
-                MirrorConfig(self, name, self._settings, section))
+            self._mirrors[name] = \
+                MirrorConfig(self, name, self._settings, section)
         return self._mirrors
 
     @property
@@ -111,64 +176,95 @@ class TUNASync(object):
         if self._providers:
             return self._providers
 
-        for mirror in self.mirrors:
-            hooks = []
-            if mirror.options["use_btrfs"]:
-                working_dir = self.btrfs_working_dir_tmpl.format(
-                    mirror_root=self.mirror_root,
-                    mirror_name=mirror.name
-                )
-                service_dir = self.btrfs_service_dir_tmpl.format(
-                    mirror_root=self.mirror_root,
-                    mirror_name=mirror.name
-                )
-                tmp_dir = self.btrfs_tmp_dir_tmpl.format(
-                    mirror_root=self.mirror_root,
-                    mirror_name=mirror.name
-                )
-                hooks.append(BtrfsHook(service_dir, working_dir, tmp_dir))
-
-            if mirror.options["provider"] == "rsync":
-                self._providers.append(
-                    RsyncProvider(
-                        mirror.name,
-                        mirror.options["upstream"],
-                        mirror.options["local_dir"],
-                        mirror.options["use_ipv6"],
-                        mirror.options.get("exclude_file", None),
-                        mirror.options["log_file"],
-                        mirror.options["interval"],
-                        hooks,
-                    )
-                )
-            elif mirror.options["provider"] == "shell":
-                self._providers.append(
-                    ShellProvider(
-                        mirror.name,
-                        mirror.options["command"],
-                        mirror.options["local_dir"],
-                        mirror.options["log_file"],
-                        mirror.options["interval"],
-                        hooks,
-                    )
-                )
+        for name, mirror in self.mirrors.iteritems():
+            hooks = mirror.hooks() + self.hooks()
+            provider = mirror.to_provider(hooks)
+            self._providers[name] = provider
 
         return self._providers
 
     def run_jobs(self):
-        for provider in self.providers:
-            p = Process(target=jobs.run_job, args=(self.semaphore, provider, ))
-            p.start()
-            self.processes.append(p)
+        for name in self.providers:
+            self.run_provider(name)
 
         def sig_handler(*args):
             print("terminate subprocesses")
-            for p in self.processes:
+            for _, np in self.processes.iteritems():
+                _, p = np
                 p.terminate()
             print("Good Bye")
+            sys.exit(0)
 
         signal.signal(signal.SIGINT, sig_handler)
         signal.signal(signal.SIGTERM, sig_handler)
+        signal.signal(signal.SIGUSR1, self.reload_mirrors)
+        signal.signal(signal.SIGUSR2, self.reload_mirrors_force)
+
+        while 1:
+            name, status = self.channel.get()
+            if status == "QUIT":
+                print("New configuration applied to {}".format(name))
+                self.run_provider(name)
+
+    def run_provider(self, name):
+        if name not in self.providers:
+            print("{} doesnot exist".format(name))
+            return
+
+        provider = self.providers[name]
+        child_queue = Queue()
+        p = Process(
+            target=jobs.run_job,
+            args=(self.semaphore, child_queue, self.channel, provider, )
+        )
+        p.start()
+        self.processes[name] = (child_queue, p)
+
+    def reload_mirrors(self, signum, frame):
+        try:
+            return self._reload_mirrors(signum, frame, force=False)
+        except Exception, e:
+            print(e)
+
+    def reload_mirrors_force(self, signum, frame):
+        try:
+            return self._reload_mirrors(signum, frame, force=True)
+        except Exception, e:
+            print(e)
+
+    def _reload_mirrors(self, signum, frame, force=False):
+        print("reload mirror configs, force restart: {}".format(force))
+        self._settings.read(self._config_file)
+
+        for section in filter(lambda s: s.startswith("mirror:"),
+                              self._settings.sections()):
+
+            _, name = section.split(":")
+            newMirCfg = MirrorConfig(self, name, self._settings, section)
+
+            if name in self._mirrors:
+                if newMirCfg.compare(self._mirrors[name]):
+                    continue
+
+            self._mirrors[name] = newMirCfg
+
+            hooks = newMirCfg.hooks() + self.hooks()
+            newProvider = newMirCfg.to_provider(hooks)
+            self._providers[name] = newProvider
+
+            if name in self.processes:
+                q, p = self.processes[name]
+
+                if force:
+                    p.terminate()
+                    print("Terminated Job: {}".format(name))
+                    self.run_provider(name)
+                else:
+                    q.put("terminate")
+                    print("New configuration queued to {}".format(name))
+            else:
+                print("New mirror: {}".format(name))
+                self.run_provider(name)
 
     # def config(self, option):
     #     if self._settings is None: