Przeglądaj źródła

Merge branch 'master' of github.com:tuna/tunasync

bigeagle 10 lat temu
rodzic
commit
8437cf720f

+ 4 - 2
README.md

@@ -3,10 +3,12 @@ tunasync
 
 ## TODO
 
-- [ ] implement `tunasynctl tail` and `tunasynctl log` or equivalent feature
+- [ ] use context manager to handle job contexts
+- [ ] Hooks need "pre_try", "post_try"
+- [x] implement `tunasynctl tail` and `tunasynctl log` or equivalent feature
 - [x] status file
     - [ ] mirror size
-    - [ ] upstream
+    - [x] upstream
 - [x] btrfs backend (create snapshot before syncing)
 - [x] add mirror job online
 - [x] use toml as configuration

+ 1 - 0
examples/tunasync.conf

@@ -31,6 +31,7 @@ provider = "shell"
 command = "sleep 10"
 local_dir = "/mnt/sdb1/mirror/archlinux/current/"
 # log_file = "/dev/null"
+exec_post_sync = "/bin/bash -c 'date --utc \"+%s\" > ${TUNASYNC_WORKING_DIR}/.timestamp'"
 
 [[mirrors]]
 name = "arch2"

+ 8 - 2
tunasync/clt_server.py

@@ -8,6 +8,10 @@ import struct
 
 class ControlServer(object):
 
+    valid_commands = set((
+        "start", "stop", "restart", "status", "log",
+    ))
+
     def __init__(self, address, mgr_chan, cld_chan):
         self.address = address
         self.mgr_chan = mgr_chan
@@ -19,7 +23,7 @@ class ControlServer(object):
                 raise Exception("file exists: {}".format(self.address))
         self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
         self.sock.bind(self.address)
-        os.chmod(address, 0700)
+        os.chmod(address, 0o700)
 
         print("Control Server listening on: {}".format(self.address))
         self.sock.listen(1)
@@ -32,7 +36,9 @@ class ControlServer(object):
                 length = struct.unpack('!H', conn.recv(2))[0]
                 content = conn.recv(length)
                 cmd = json.loads(content)
-                self.mgr_chan.put(("CMD", (cmd['cmd'], cmd['target'])))
+                if cmd['cmd'] not in self.valid_commands:
+                    raise Exception("Invalid Command")
+                self.mgr_chan.put(("CMD", (cmd['cmd'], cmd['target'], cmd["kwargs"])))
             except Exception as e:
                 print(e)
                 res = "Invalid Command"

+ 35 - 0
tunasync/exec_pre_post.py

@@ -0,0 +1,35 @@
+#!/usr/bin/env python2
+# -*- coding:utf-8 -*-
+import os
+import sh
+import shlex
+from .hook import JobHook
+
+
+class CmdExecHook(JobHook):
+    POST_SYNC = "post_sync"
+    PRE_SYNC = "pre_sync"
+
+    def __init__(self, command, exec_at=POST_SYNC):
+        self.command = shlex.split(command)
+        if exec_at == self.POST_SYNC:
+            self.before_job = self._keep_calm
+            self.after_job = self._exec
+        elif exec_at == self.PRE_SYNC:
+            self.before_job = self._exec
+            self.after_job = self._keep_calm
+
+    def _keep_calm(self, ctx={}, **kwargs):
+        pass
+
+    def _exec(self, ctx={}, **kwargs):
+        new_env = os.environ.copy()
+        new_env["TUNASYNC_MIRROR_NAME"] = ctx["mirror_name"]
+        new_env["TUNASYNC_WORKING_DIR"] = ctx["current_dir"]
+
+        _cmd = self.command[0]
+        _args = [] if len(self.command) == 1 else self.command[1:]
+        cmd = sh.Command(_cmd)
+        cmd(*_args, _env=new_env)
+
+# vim: ts=4 sw=4 sts=4 expandtab

+ 6 - 3
tunasync/jobs.py

@@ -36,10 +36,11 @@ def run_job(sema, child_q, manager_q, provider, **settings):
             break
         aquired = True
 
-        status = "syncing"
-        manager_q.put(("UPDATE", (provider.name, status)))
         ctx = {}   # put context info in it
         ctx['current_dir'] = provider.local_dir
+        ctx['mirror_name'] = provider.name
+        status = "pre-syncing"
+        manager_q.put(("UPDATE", (provider.name, status, ctx)))
 
         try:
             for hook in provider.hooks:
@@ -49,7 +50,9 @@ def run_job(sema, child_q, manager_q, provider, **settings):
             traceback.print_exc()
             status = "fail"
         else:
+            status = "syncing"
             for retry in range(max_retry):
+                manager_q.put(("UPDATE", (provider.name, status, ctx)))
                 print("start syncing {}, retry: {}".format(provider.name, retry))
                 provider.run(ctx=ctx)
 
@@ -77,7 +80,7 @@ def run_job(sema, child_q, manager_q, provider, **settings):
             provider.name, provider.interval
         ))
 
-        manager_q.put(("UPDATE", (provider.name, status)))
+        manager_q.put(("UPDATE", (provider.name, status, ctx)))
 
         try:
             msg = child_q.get(timeout=provider.interval * 60)

+ 11 - 0
tunasync/mirror_config.py

@@ -5,6 +5,7 @@ from datetime import datetime
 from .mirror_provider import RsyncProvider, ShellProvider
 from .btrfs_snapshot import BtrfsHook
 from .loglimit import LogLimitHook
+from .exec_pre_post import CmdExecHook
 
 
 class MirrorConfig(object):
@@ -80,6 +81,7 @@ class MirrorConfig(object):
                 local_dir=self.local_dir,
                 log_dir=self.log_dir,
                 log_file=self.log_file,
+                log_stdout=self.options.get("log_stdout", True),
                 interval=self.interval,
                 hooks=hooks
             )
@@ -126,6 +128,15 @@ class MirrorConfig(object):
             hooks.append(BtrfsHook(service_dir, working_dir, gc_dir))
 
         hooks.append(LogLimitHook())
+
+        if self.exec_pre_sync:
+            hooks.append(
+                CmdExecHook(self.exec_pre_sync, CmdExecHook.PRE_SYNC))
+
+        if self.exec_post_sync:
+            hooks.append(
+                CmdExecHook(self.exec_post_sync, CmdExecHook.POST_SYNC))
+
         return hooks
 
 # vim: ts=4 sw=4 sts=4 expandtab

+ 11 - 4
tunasync/mirror_provider.py

@@ -2,6 +2,7 @@
 # -*- coding:utf-8 -*-
 import sh
 import os
+import shlex
 from datetime import datetime
 
 
@@ -105,12 +106,13 @@ class RsyncProvider(MirrorProvider):
 class ShellProvider(MirrorProvider):
 
     def __init__(self, name, command, upstream_url, local_dir, log_dir,
-                 log_file="/dev/null", interval=120, hooks=[]):
+                 log_file="/dev/null", log_stdout=True, interval=120, hooks=[]):
 
         super(ShellProvider, self).__init__(name, local_dir, log_dir, log_file,
                                             interval, hooks)
         self.upstream_url = str(upstream_url)
-        self.command = command.split()
+        self.command = shlex.split(command)
+        self.log_stdout = log_stdout
 
     def run(self, ctx={}):
 
@@ -127,8 +129,13 @@ class ShellProvider(MirrorProvider):
         _args = [] if len(self.command) == 1 else self.command[1:]
 
         cmd = sh.Command(_cmd)
-        self.p = cmd(*_args, _env=new_env, _out=log_file,
-                     _err=log_file, _out_bufsize=1, _bg=True)
+
+        if self.log_stdout:
+            self.p = cmd(*_args, _env=new_env, _out=log_file,
+                         _err=log_file, _out_bufsize=1, _bg=True)
+        else:
+            self.p = cmd(*_args, _env=new_env, _out='/dev/null',
+                         _err='/dev/null', _out_bufsize=1, _bg=True)
 
 
 # vim: ts=4 sw=4 sts=4 expandtab

+ 6 - 2
tunasync/status_manager.py

@@ -32,8 +32,11 @@ class StatusManager(object):
             pass
 
         self.mirrors = mirrors
+        self.mirrors_ctx = {key: {} for key in self.mirrors}
 
     def get_info(self, name, key):
+        if key == "ctx":
+            return self.mirrors_ctx.get(name, {})
         _m = self.mirrors.get(name, {})
         return _m.get(key, None)
 
@@ -50,7 +53,7 @@ class StatusManager(object):
         self.mirrors[name] = dict(_m.items())
         self.commit_db()
 
-    def update_status(self, name, status):
+    def update_status(self, name, status, ctx={}):
 
         _m = self.mirrors.get(name, {
             'name': name,
@@ -58,7 +61,7 @@ class StatusManager(object):
             'status': '-',
         })
 
-        if status in ("syncing", "fail"):
+        if status in ("syncing", "fail", "pre-syncing"):
             update_time = _m["last_update"]
         elif status == "success":
             update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@@ -68,6 +71,7 @@ class StatusManager(object):
         _m['last_update'] = update_time
         _m['status'] = status
         self.mirrors[name] = dict(_m.items())
+        self.mirrors_ctx[name] = ctx
 
         self.commit_db()
         print("Updated status file, {}:{}".format(name, status))

+ 73 - 37
tunasync/tunasync.py

@@ -173,63 +173,99 @@ class TUNASync(object):
 
     def run_forever(self):
         while 1:
-
             try:
                 msg_hdr, msg_body = self.channel.get()
             except IOError:
                 continue
 
             if msg_hdr == "UPDATE":
-                name, status = msg_body
+                mirror_name, status, ctx = msg_body
                 try:
-                    self.status_manager.update_status(name, status)
+                    self.status_manager.update_status(
+                        mirror_name, status, dict(ctx.items()))
                 except Exception as e:
                     print(e)
 
             elif msg_hdr == "CONFIG_ACK":
-                name, status = msg_body
+                mirror_name, status = msg_body
                 if status == "QUIT":
-                    print("New configuration applied to {}".format(name))
-                    self.run_provider(name)
+                    print("New configuration applied to {}".format(mirror_name))
+                    self.run_provider(mirror_name)
 
             elif msg_hdr == "CMD":
-                cmd, name = msg_body
-                if (name not in self.mirrors) and (name != "__ALL__"):
+                cmd, mirror_name, kwargs = msg_body
+                if (mirror_name not in self.mirrors) and (mirror_name != "__ALL__"):
                     self.ctrl_channel.put("Invalid target")
                     continue
+                res = self.handle_cmd(cmd, mirror_name, kwargs)
+                self.ctrl_channel.put(res)
 
-                if cmd == "restart":
-                    _, p = self.processes[name]
-                    p.terminate()
-                    self.providers[name].set_delay(0)
-                    self.run_provider(name)
-                    res = "Restarted Job: {}".format(name)
+    def handle_cmd(self, cmd, mirror_name, kwargs):
+        if cmd == "restart":
+            _, p = self.processes[mirror_name]
+            p.terminate()
+            self.providers[mirror_name].set_delay(0)
+            self.run_provider(mirror_name)
+            res = "Restarted Job: {}".format(mirror_name)
+
+        elif cmd == "stop":
+            if mirror_name not in self.processes:
+                res = "{} not running".format(mirror_name)
+                return res
+
+            _, p = self.processes.pop(mirror_name)
+            p.terminate()
+            res = "Stopped Job: {}".format(mirror_name)
+
+        elif cmd == "start":
+            if mirror_name in self.processes:
+                res = "{} already running".format(mirror_name)
+                return res
+
+            self.run_provider(mirror_name)
+            res = "Started Job: {}".format(mirror_name)
+
+        elif cmd == "status":
+            if mirror_name == "__ALL__":
+                res = self.status_manager.list_status(_format=True)
+            else:
+                res = self.status_manager.get_status(mirror_name, _format=True)
+
+        elif cmd == "log":
+            job_ctx = self.status_manager.get_info(mirror_name, "ctx")
+            n = kwargs.get("n", 0)
+            if n == 0:
+                res = job_ctx.get("log_file", "/dev/null")
+            else:
+                import os
+                log_file = job_ctx.get("log_file", None)
+                if log_file is None:
+                    return "/dev/null"
 
-                elif cmd == "stop":
-                    if name not in self.processes:
-                        res = "{} not running".format(name)
-                        self.ctrl_channel.put(res)
-                        continue
+                log_dir = os.path.dirname(log_file)
+                lfiles = [
+                    os.path.join(log_dir, lfile)
+                    for lfile in os.listdir(log_dir)
+                    if lfile.startswith(mirror_name) and lfile != "latest"
+                ]
 
-                    _, p = self.processes.pop(name)
-                    p.terminate()
-                    res = "Stopped Job: {}".format(name)
+                if len(lfiles) <= n:
+                    res = "Only {} log files available".format(len(lfiles))
+                    return res
 
-                elif cmd == "start":
-                    if name in self.processes:
-                        res = "{} already running".format(name)
-                        self.ctrl_channel.put(res)
-                        continue
+                lfiles_set = set(lfiles)
+                # sort to get the newest 10 files
+                lfiles_ts = sorted(
+                    [(os.path.getmtime(lfile), lfile) for lfile in lfiles],
+                    key=lambda x: x[0],
+                    reverse=True,
+                )
+                return lfiles_ts[n][1]
+
+        else:
+            res = "Invalid command"
+
+        return res
 
-                    self.run_provider(name)
-                    res = "Started Job: {}".format(name)
-                elif cmd == "status":
-                    if name == "__ALL__":
-                        res = self.status_manager.list_status(_format=True)
-                    else:
-                        res = self.status_manager.get_status(name, _format=True)
-                else:
-                    res = "Invalid command"
 
-                self.ctrl_channel.put(res)
 # vim: ts=4 sw=4 sts=4 expandtab

+ 29 - 6
tunasynctl.py

@@ -10,22 +10,45 @@ if __name__ == "__main__":
     parser = argparse.ArgumentParser(prog="tunasynctl")
     parser.add_argument("-s", "--socket",
                         default="/var/run/tunasync.sock", help="socket file")
-    parser.add_argument("command", help="command")
-    parser.add_argument("target", nargs="?", default="__ALL__", help="target")
 
-    args = parser.parse_args()
+    subparsers = parser.add_subparsers(dest="command", help='sub-command help')
+
+    sp = subparsers.add_parser('start', help="start job")
+    sp.add_argument("target", help="mirror job name")
+
+    sp = subparsers.add_parser('stop', help="stop job")
+    sp.add_argument("target", help="mirror job name")
+
+    sp = subparsers.add_parser('restart', help="restart job")
+    sp.add_argument("target", help="mirror job name")
+
+    sp = subparsers.add_parser('status', help="show mirror status")
+    sp.add_argument("target", nargs="?", default="__ALL__", help="mirror job name")
+
+    sp = subparsers.add_parser('log', help="return log file path")
+    sp.add_argument("-n", type=int, default=0, help="last n-th log, default 0 (latest)")
+    sp.add_argument("target", help="mirror job name")
+
+    sp = subparsers.add_parser('help', help="show help message")
+
+    args = vars(parser.parse_args())
+
+    if args['command'] == "help":
+        parser.print_help()
+        sys.exit(0)
 
     sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 
     try:
-        sock.connect(args.socket)
+        sock.connect(args.pop("socket"))
     except socket.error as msg:
         print(msg)
         sys.exit(1)
 
     pack = json.dumps({
-        'cmd': args.command,
-        'target': args.target,
+        "cmd": args.pop("command"),
+        "target": args.pop("target"),
+        "kwargs": args,
     })
 
     try: