bigeagle vor 11 Jahren
Ursprung
Commit
65fd553761
8 geänderte Dateien mit 314 neuen und 0 gelöschten Zeilen
  1. 6 0
      README.md
  2. 4 0
      examples/shell_provider.sh
  3. 36 0
      examples/tunasync.ini
  4. 22 0
      tunasync.py
  5. 4 0
      tunasync/__init__.py
  6. 18 0
      tunasync/jobs.py
  7. 83 0
      tunasync/mirror_provider.py
  8. 141 0
      tunasync/tunasync.py

+ 6 - 0
README.md

@@ -1,2 +1,8 @@
 tunasync
 ========
+
+## TODO
+
+- [ ] status file
+- [ ] btrfs backend (create snapshot before syncing)
+- [ ] debmirror provider

+ 4 - 0
examples/shell_provider.sh

@@ -0,0 +1,4 @@
+#!/bin/bash
+echo $TUNASYNC_LOCAL_DIR
+echo $TUNASYNC_LOG_FILE
+sleep 5

+ 36 - 0
examples/tunasync.ini

@@ -0,0 +1,36 @@
+[global]
+log_dir = /var/log/tunasync
+local_dir = /srv/mirror
+storage_backend = btrfs
+; maximum numbers of running jobs
+concurrent = 2
+; interval in minutes
+interval = 1
+
+# [mirror:archlinux]
+# provider = rsync
+# upstream = rsync://mirrors6.ustc.edu.cn/archlinux/
+# local_dir = /mnt/sdb1/mirror/archlinux/current/
+# log_file = /tmp/archlinux-{date}.log
+# use_ipv6 = yes
+
+[mirror:archlinux]
+provider = shell
+command = sleep 10
+local_dir = /mnt/sdb1/mirror/archlinux/current/
+log_file = /tmp/archlinux-{date}.log
+
+
+[mirror:arch2]
+provider = shell
+command = sleep 5
+local_dir = /mnt/sdb1/mirror/archlinux/current/
+log_file = /tmp/arch2-{date}.log
+
+
+[mirror:arch3]
+provider = shell
+command = ./shell_provider.sh
+local_dir = /mnt/sdb1/mirror/archlinux/current/
+log_file = /tmp/arch3-{date}.log
+

+ 22 - 0
tunasync.py

@@ -0,0 +1,22 @@
+#!/usr/bin/env python2
+# -*- coding:utf-8 -*-
+import os
+import argparse
+from tunasync import TUNASync
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(prog="tunasync")
+    parser.add_argument("-c", "--config", default="tunasync.ini", help="config file")
+    parser.add_argument("--pidfile", default="/var/run/tunasync.pid", help="pidfile")
+
+    args = parser.parse_args()
+
+    with open(args.pidfile, 'w') as f:
+        f.write("{}".format(os.getpid()))
+
+    tunaSync = TUNASync()
+    tunaSync.read_config(args.config)
+
+    tunaSync.run_jobs()
+
+# vim: ts=4 sw=4 sts=4 expandtab

+ 4 - 0
tunasync/__init__.py

@@ -0,0 +1,4 @@
+#!/usr/bin/env python2
+# -*- coding:utf-8 -*-
+from .tunasync import TUNASync
+# vim: ts=4 sw=4 sts=4 expandtab

+ 18 - 0
tunasync/jobs.py

@@ -0,0 +1,18 @@
+#!/usr/bin/env python2
+# -*- coding:utf-8 -*-
+import time
+
+
+def run_job(sema, provider):
+    while 1:
+        sema.acquire(True)
+        print("start syncing {}".format(provider.name))
+        provider.run()
+        sema.release()
+        print("syncing {} finished, sleep {} minutes for the next turn".format(
+            provider.name, provider.interval
+        ))
+        time.sleep(provider.interval * 60)
+
+
+# vim: ts=4 sw=4 sts=4 expandtab

+ 83 - 0
tunasync/mirror_provider.py

@@ -0,0 +1,83 @@
+#!/usr/bin/env python2
+# -*- coding:utf-8 -*-
+import sh
+import os
+from datetime import datetime
+
+
+class MirrorProvider(object):
+    '''
+    Mirror method class, can be `rsync', `debmirror', etc.
+    '''
+
+    def run(self):
+        raise NotImplementedError("run method should be implemented")
+
+
+class RsyncProvider(MirrorProvider):
+
+    _default_options = "-av --delete-after"
+
+    def __init__(self, name, upstream_url, local_dir, useIPv6=True,
+                 exclude_file=None, log_file="/dev/null", interval=120):
+
+        self.name = name
+        self.upstream_url = upstream_url
+        self.local_dir = local_dir
+        self.useIPv6 = useIPv6
+        self.exclude_file = exclude_file
+        self.log_file = log_file
+        self.interval = interval
+
+    @property
+    def options(self):
+
+        _options = self._default_options.split()
+
+        if self.useIPv6:
+            _options.append("-6")
+        else:
+            _options.append("-4")
+
+        if self.exclude_file:
+            _options.append("--exclude-from")
+            _options.append(self.exclude_file)
+
+        return _options
+
+    def run(self):
+        _args = self.options
+        _args.append(self.upstream_url)
+        _args.append(self.local_dir)
+        now = datetime.now().strftime("%Y-%m-%d_%H")
+        log_file = self.log_file.format(date=now)
+
+        sh.rsync(*_args, _out=log_file, _err=log_file)
+
+
+class ShellProvider(MirrorProvider):
+
+    def __init__(self, name, command, local_dir,
+                 log_file="/dev/null", interval=120):
+        self.name = name
+        self.command = command.split()
+        self.local_dir = local_dir
+        self.log_file = log_file
+        self.interval = interval
+
+    def run(self):
+        now = datetime.now().strftime("%Y-%m-%d_%H")
+        log_file = self.log_file.format(date=now)
+
+        new_env = os.environ.copy()
+        new_env["TUNASYNC_LOCAL_DIR"] = self.local_dir
+        new_env["TUNASYNC_LOG_FILE"] = log_file
+
+        _cmd = self.command[0]
+        _args = [] if len(self.command) == 1 else self.command[1:]
+
+        cmd = sh.Command(_cmd)
+        cmd(*_args, _env=new_env, _out=log_file, _err=log_file)
+
+
+# vim: ts=4 sw=4 sts=4 expandtab

+ 141 - 0
tunasync/tunasync.py

@@ -0,0 +1,141 @@
+#!/usr/bin/env python2
+# -*- coding:utf-8 -*-
+import ConfigParser
+import os.path
+import signal
+
+from multiprocessing import Process, Semaphore
+from . import jobs
+from .mirror_provider import RsyncProvider, ShellProvider
+
+
+class MirrorConfig(object):
+
+    _valid_providers = set(("rsync", "debmirror", "shell", ))
+
+    def __init__(self, name, cfgParser, section):
+        self._cp = cfgParser
+        self._sec = section
+
+        self.name = name
+        self.options = dict(self._cp.items(self._sec))
+        self._validate()
+
+    def _validate(self):
+        provider = self.options.get("provider", None)
+        assert provider in self._valid_providers
+
+        if provider == "rsync":
+            assert "upstream" in self.options
+            if "use_ipv6" in self.options:
+                self.options["use_ipv6"] = self._cp.getboolean(self._sec,
+                                                               "use_ipv6")
+
+        elif provider == "shell":
+            assert "command" in self.options
+
+        if "local_dir" not in self.options:
+            self.options["local_dir"] = os.path.join(
+                self._cp.get("global", "local_dir"),
+                self.name)
+
+        self.options["interval"] = int(
+            self.options.get("interval",
+                             self._cp.getint("global", "interval"))
+        )
+
+        log_dir = self._cp.get("global", "log_dir")
+        self.options["log_file"] = self.options.get(
+            "log_file",
+            os.path.join(log_dir, self.name, "{date}.log")
+        )
+
+
+class TUNASync(object):
+
+    _instance = None
+    _settings = None
+    _inited = False
+
+    def __new__(cls, *args, **kwargs):
+        if not cls._instance:
+            cls._instance = super(TUNASync, cls).__new__(cls, *args, **kwargs)
+
+        return cls._instance
+
+    def read_config(self, config_file):
+        self._settings = ConfigParser.ConfigParser()
+        self._settings.read(config_file)
+
+        self._inited = True
+        self._mirrors = []
+        self._providers = []
+        self.processes = []
+        self.semaphore = Semaphore(self._settings.getint("global", "concurrent"))
+
+    @property
+    def mirrors(self):
+        if self._mirrors:
+            return self._mirrors
+
+        for section in filter(lambda s: s.startswith("mirror:"),
+                              self._settings.sections()):
+
+            _, name = section.split(":")
+            self._mirrors.append(
+                MirrorConfig(name, self._settings, section))
+        return self._mirrors
+
+    @property
+    def providers(self):
+        if self._providers:
+            return self._providers
+
+        for mirror in self.mirrors:
+            if mirror.options["provider"] == "rsync":
+                self._providers.append(
+                    RsyncProvider(
+                        mirror.name,
+                        mirror.options["upstream"],
+                        mirror.options["local_dir"],
+                        mirror.options["use_ipv6"],
+                        mirror.options.get("exclude_file", None),
+                        mirror.options["log_file"],
+                        mirror.options["interval"]
+                    )
+                )
+            elif mirror.options["provider"] == "shell":
+                self._providers.append(
+                    ShellProvider(
+                        mirror.name,
+                        mirror.options["command"],
+                        mirror.options["local_dir"],
+                        mirror.options["log_file"],
+                        mirror.options["interval"]
+                    )
+                )
+
+        return self._providers
+
+    def run_jobs(self):
+        for provider in self.providers:
+            p = Process(target=jobs.run_job, args=(self.semaphore, provider, ))
+            p.start()
+            self.processes.append(p)
+
+        def sig_handler(*args):
+            print("terminate subprocesses")
+            for p in self.processes:
+                p.terminate()
+            print("Good Bye")
+
+        signal.signal(signal.SIGINT, sig_handler)
+        signal.signal(signal.SIGTERM, sig_handler)
+
+    # def config(self, option):
+    #     if self._settings is None:
+    #         raise TUNASyncException("Config not inited")
+    #
+
+
+# vim: ts=4 sw=4 sts=4 expandtab