فهرست منبع

btrfs support

bigeagle 11 سال پیش
والد
کامیت
1ce19c88b1
6فایلهای تغییر یافته به همراه147 افزوده شده و 37 حذف شده
  1. 1 1
      README.md
  2. 22 16
      examples/tunasync.ini
  3. 50 0
      tunasync/btrfs_snapshot.py
  4. 8 0
      tunasync/jobs.py
  5. 20 12
      tunasync/mirror_provider.py
  6. 46 8
      tunasync/tunasync.py

+ 1 - 1
README.md

@@ -4,5 +4,5 @@ tunasync
 ## TODO
 
 - [ ] status file
-- [ ] btrfs backend (create snapshot before syncing)
+- [x] btrfs backend (create snapshot before syncing)
 - [ ] debmirror provider

+ 22 - 16
examples/tunasync.ini

@@ -1,36 +1,42 @@
 [global]
 log_dir = /var/log/tunasync
-local_dir = /srv/mirror
-storage_backend = btrfs
+; mirror_root = /srv/mirror_disk
+mirror_root = /mnt/sdb1/mirror
+use_btrfs = yes
+local_dir = {mirror_root}/{mirror_name}/_working
 ; maximum numbers of running jobs
-concurrent = 2
+concurrent = 3
 ; interval in minutes
-interval = 1
+interval = 120
+
+[btrfs]
+service_dir = {mirror_root}/{mirror_name}/_current
+working_dir = {mirror_root}/{mirror_name}/_working
+tmp_dir = {mirror_root}/{mirror_name}/_tmp
 
-# [mirror:archlinux]
-# provider = rsync
-# upstream = rsync://mirrors6.ustc.edu.cn/archlinux/
-# local_dir = /mnt/sdb1/mirror/archlinux/current/
-# log_file = /tmp/archlinux-{date}.log
-# use_ipv6 = yes
 
 [mirror:archlinux]
-provider = shell
-command = sleep 10
-local_dir = /mnt/sdb1/mirror/archlinux/current/
+provider = rsync
+upstream = rsync://mirror.us.leaseweb.net/archlinux/
 log_file = /tmp/archlinux-{date}.log
+use_ipv6 = yes
+
+# [mirror:archlinux]
+# provider = shell
+# command = sleep 10
+# local_dir = /mnt/sdb1/mirror/archlinux/current/
+# log_file = /tmp/archlinux-{date}.log
 
 
 [mirror:arch2]
 provider = shell
 command = sleep 5
-local_dir = /mnt/sdb1/mirror/archlinux/current/
 log_file = /tmp/arch2-{date}.log
+use_btrfs = no
 
 
 [mirror:arch3]
 provider = shell
 command = ./shell_provider.sh
-local_dir = /mnt/sdb1/mirror/archlinux/current/
 log_file = /tmp/arch3-{date}.log
-
+use_btrfs = no

+ 50 - 0
tunasync/btrfs_snapshot.py

@@ -0,0 +1,50 @@
+#!/usr/bin/env python2
+# -*- coding:utf-8 -*-
+import sh
+
+
+class BtrfsVolumeError(Exception):
+    pass
+
+
+class BtrfsHook(object):
+
+    def __init__(self, service_dir, working_dir, tmp_dir):
+        self.service_dir = service_dir
+        self.working_dir = working_dir
+        self.tmp_dir = tmp_dir
+
+    def before_job(self):
+        self._create_working_snapshot()
+
+    def after_job(self):
+        self._commit_changes()
+
+    def _ensure_subvolume(self):
+        # print(self.service_dir)
+        try:
+            ret = sh.btrfs("subvolume", "show", self.service_dir)
+        except Exception, e:
+            print(e)
+            raise BtrfsVolumeError("Invalid subvolume")
+
+        if ret.stderr != '':
+            raise BtrfsVolumeError("Invalid subvolume")
+
+    def _create_working_snapshot(self):
+        self._ensure_subvolume()
+        # print("btrfs subvolume snapshot {} {}".format(self.service_dir, self.working_dir))
+        sh.btrfs("subvolume", "snapshot", self.service_dir, self.working_dir)
+
+    def _commit_changes(self):
+        self._ensure_subvolume()
+        self._ensure_subvolume()
+        out = sh.mv(self.service_dir, self.tmp_dir)
+        assert out.exit_code == 0 and out.stderr == ""
+        out = sh.mv(self.working_dir, self.service_dir)
+        assert out.exit_code == 0 and out.stderr == ""
+        # print("btrfs subvolume delete {}".format(self.tmp_dir))
+        out = sh.btrfs("subvolume", "delete", self.tmp_dir)
+        assert out.exit_code == 0 and out.stderr == ""
+
+# vim: ts=4 sw=4 sts=4 expandtab

+ 8 - 0
tunasync/jobs.py

@@ -7,7 +7,15 @@ def run_job(sema, provider):
     while 1:
         sema.acquire(True)
         print("start syncing {}".format(provider.name))
+
+        for hook in provider.hooks:
+            hook.before_job()
+
         provider.run()
+
+        for hook in provider.hooks[::-1]:
+            hook.after_job()
+
         sema.release()
         print("syncing {} finished, sleep {} minutes for the next turn".format(
             provider.name, provider.interval

+ 20 - 12
tunasync/mirror_provider.py

@@ -10,6 +10,14 @@ class MirrorProvider(object):
     Mirror method class, can be `rsync', `debmirror', etc.
     '''
 
+    def __init__(self, name, local_dir, log_file="/dev/null",
+                 interval=120, hooks=[]):
+        self.name = name
+        self.local_dir = local_dir
+        self.log_file = log_file
+        self.interval = interval
+        self.hooks = hooks
+
     def run(self):
         raise NotImplementedError("run method should be implemented")
 
@@ -19,15 +27,14 @@ class RsyncProvider(MirrorProvider):
     _default_options = "-av --delete-after"
 
     def __init__(self, name, upstream_url, local_dir, useIPv6=True,
-                 exclude_file=None, log_file="/dev/null", interval=120):
+                 exclude_file=None, log_file="/dev/null", interval=120,
+                 hooks=[]):
+        super(RsyncProvider, self).__init__(name, local_dir, log_file,
+                                            interval, hooks)
 
-        self.name = name
         self.upstream_url = upstream_url
-        self.local_dir = local_dir
         self.useIPv6 = useIPv6
         self.exclude_file = exclude_file
-        self.log_file = log_file
-        self.interval = interval
 
     @property
     def options(self):
@@ -46,26 +53,27 @@ class RsyncProvider(MirrorProvider):
         return _options
 
     def run(self):
+
         _args = self.options
         _args.append(self.upstream_url)
         _args.append(self.local_dir)
         now = datetime.now().strftime("%Y-%m-%d_%H")
         log_file = self.log_file.format(date=now)
 
-        sh.rsync(*_args, _out=log_file, _err=log_file)
+        sh.rsync(*_args, _out=log_file, _err=log_file, _out_bufsize=1)
 
 
 class ShellProvider(MirrorProvider):
 
     def __init__(self, name, command, local_dir,
-                 log_file="/dev/null", interval=120):
-        self.name = name
+                 log_file="/dev/null", interval=120, hooks=[]):
+
+        super(ShellProvider, self).__init__(name, local_dir, log_file,
+                                            interval, hooks)
         self.command = command.split()
-        self.local_dir = local_dir
-        self.log_file = log_file
-        self.interval = interval
 
     def run(self):
+
         now = datetime.now().strftime("%Y-%m-%d_%H")
         log_file = self.log_file.format(date=now)
 
@@ -77,7 +85,7 @@ class ShellProvider(MirrorProvider):
         _args = [] if len(self.command) == 1 else self.command[1:]
 
         cmd = sh.Command(_cmd)
-        cmd(*_args, _env=new_env, _out=log_file, _err=log_file)
+        cmd(*_args, _env=new_env, _out=log_file, _err=log_file, _out_bufsize=1)
 
 
 # vim: ts=4 sw=4 sts=4 expandtab

+ 46 - 8
tunasync/tunasync.py

@@ -7,13 +7,15 @@ import signal
 from multiprocessing import Process, Semaphore
 from . import jobs
 from .mirror_provider import RsyncProvider, ShellProvider
+from .btrfs_snapshot import BtrfsHook
 
 
 class MirrorConfig(object):
 
     _valid_providers = set(("rsync", "debmirror", "shell", ))
 
-    def __init__(self, name, cfgParser, section):
+    def __init__(self, parent, name, cfgParser, section):
+        self._parent = parent
         self._cp = cfgParser
         self._sec = section
 
@@ -34,10 +36,13 @@ class MirrorConfig(object):
         elif provider == "shell":
             assert "command" in self.options
 
-        if "local_dir" not in self.options:
-            self.options["local_dir"] = os.path.join(
-                self._cp.get("global", "local_dir"),
-                self.name)
+        local_dir_tmpl = self.options.get(
+            "local_dir", self._cp.get("global", "local_dir"))
+
+        self.options["local_dir"] = local_dir_tmpl.format(
+            mirror_root=self._cp.get("global", "mirror_root"),
+            mirror_name=self.name,
+        )
 
         self.options["interval"] = int(
             self.options.get("interval",
@@ -50,6 +55,12 @@ class MirrorConfig(object):
             os.path.join(log_dir, self.name, "{date}.log")
         )
 
+        try:
+            self.options["use_btrfs"] = self._cp.getboolean(
+                self._sec, "use_btrfs")
+        except ConfigParser.NoOptionError:
+            self.options["use_btrfs"] = self._parent.use_btrfs
+
 
 class TUNASync(object):
 
@@ -73,6 +84,15 @@ class TUNASync(object):
         self.processes = []
         self.semaphore = Semaphore(self._settings.getint("global", "concurrent"))
 
+        self.mirror_root = self._settings.get("global", "mirror_root")
+        self.use_btrfs = self._settings.getboolean("global", "use_btrfs")
+        self.btrfs_service_dir_tmpl = self._settings.get(
+            "btrfs", "service_dir")
+        self.btrfs_working_dir_tmpl = self._settings.get(
+            "btrfs", "working_dir")
+        self.btrfs_tmp_dir_tmpl = self._settings.get(
+            "btrfs", "tmp_dir")
+
     @property
     def mirrors(self):
         if self._mirrors:
@@ -83,7 +103,7 @@ class TUNASync(object):
 
             _, name = section.split(":")
             self._mirrors.append(
-                MirrorConfig(name, self._settings, section))
+                MirrorConfig(self, name, self._settings, section))
         return self._mirrors
 
     @property
@@ -92,6 +112,22 @@ class TUNASync(object):
             return self._providers
 
         for mirror in self.mirrors:
+            hooks = []
+            if mirror.options["use_btrfs"]:
+                working_dir = self.btrfs_working_dir_tmpl.format(
+                    mirror_root=self.mirror_root,
+                    mirror_name=mirror.name
+                )
+                service_dir = self.btrfs_service_dir_tmpl.format(
+                    mirror_root=self.mirror_root,
+                    mirror_name=mirror.name
+                )
+                tmp_dir = self.btrfs_tmp_dir_tmpl.format(
+                    mirror_root=self.mirror_root,
+                    mirror_name=mirror.name
+                )
+                hooks.append(BtrfsHook(service_dir, working_dir, tmp_dir))
+
             if mirror.options["provider"] == "rsync":
                 self._providers.append(
                     RsyncProvider(
@@ -101,7 +137,8 @@ class TUNASync(object):
                         mirror.options["use_ipv6"],
                         mirror.options.get("exclude_file", None),
                         mirror.options["log_file"],
-                        mirror.options["interval"]
+                        mirror.options["interval"],
+                        hooks,
                     )
                 )
             elif mirror.options["provider"] == "shell":
@@ -111,7 +148,8 @@ class TUNASync(object):
                         mirror.options["command"],
                         mirror.options["local_dir"],
                         mirror.options["log_file"],
-                        mirror.options["interval"]
+                        mirror.options["interval"],
+                        hooks,
                     )
                 )