Browse Source

more robust job exec

bigeagle 10 years ago
parent
commit
1dc133fcb3
2 changed files with 80 additions and 46 deletions
  1. 1 0
      tunasync/exec_pre_post.py
  2. 79 46
      tunasync/jobs.py

+ 1 - 0
tunasync/exec_pre_post.py

@@ -26,6 +26,7 @@ class CmdExecHook(JobHook):
         new_env = os.environ.copy()
         new_env = os.environ.copy()
         new_env["TUNASYNC_MIRROR_NAME"] = ctx["mirror_name"]
         new_env["TUNASYNC_MIRROR_NAME"] = ctx["mirror_name"]
         new_env["TUNASYNC_WORKING_DIR"] = ctx["current_dir"]
         new_env["TUNASYNC_WORKING_DIR"] = ctx["current_dir"]
+        new_env["TUNASYNC_JOB_EXIT_STATUS"] = kwargs.get("status", "")
 
 
         _cmd = self.command[0]
         _cmd = self.command[0]
         _args = [] if len(self.command) == 1 else self.command[1:]
         _args = [] if len(self.command) == 1 else self.command[1:]

+ 79 - 46
tunasync/jobs.py

@@ -5,6 +5,7 @@ import sys
 from setproctitle import setproctitle
 from setproctitle import setproctitle
 import signal
 import signal
 import Queue
 import Queue
+import traceback
 
 
 
 
 def run_job(sema, child_q, manager_q, provider, **settings):
 def run_job(sema, child_q, manager_q, provider, **settings):
@@ -18,17 +19,87 @@ def run_job(sema, child_q, manager_q, provider, **settings):
             sema.release()
             sema.release()
         sys.exit(0)
         sys.exit(0)
 
 
-    signal.signal(signal.SIGTERM, before_quit)
-    if provider.delay > 0:
+    def sleep_wait(timeout):
         try:
         try:
-            msg = child_q.get(timeout=provider.delay)
+            msg = child_q.get(timeout=timeout)
             if msg == "terminate":
             if msg == "terminate":
                 manager_q.put(("CONFIG_ACK", (provider.name, "QUIT")))
                 manager_q.put(("CONFIG_ACK", (provider.name, "QUIT")))
-                return
+                return True
         except Queue.Empty:
         except Queue.Empty:
-            pass
+            return False
+
+    signal.signal(signal.SIGTERM, before_quit)
+
+    if provider.delay > 0:
+        if sleep_wait(provider.delay):
+            return
 
 
     max_retry = settings.get("max_retry", 1)
     max_retry = settings.get("max_retry", 1)
+
+    def _real_run(idx=0, stage="job_hook", ctx=None):
+        """\
+        4 stages:
+            0 -> job_hook, 1 -> set_retry, 2 -> exec_hook, 3 -> exec
+        """
+
+        assert(ctx is not None)
+
+        if stage == "exec":
+            # exec_job
+            try:
+                provider.run(ctx=ctx)
+                provider.wait()
+            except sh.ErrorReturnCode:
+                status = "fail"
+            else:
+                status = "success"
+            return status
+
+        elif stage == "set_retry":
+            # enter stage 3 with retry
+            for retry in range(max_retry):
+                status = "syncing"
+                manager_q.put(("UPDATE", (provider.name, status, ctx)))
+                print("start syncing {}, retry: {}".format(provider.name, retry))
+                status = _real_run(idx=0, stage="exec_hook", ctx=ctx)
+                if status == "success":
+                    break
+            return status
+
+        # job_hooks
+        elif stage == "job_hook":
+            if idx == len(provider.hooks):
+                return _real_run(idx=idx, stage="set_retry", ctx=ctx)
+            hook = provider.hooks[idx]
+            hook_before, hook_after = hook.before_job, hook.after_job
+            status = "pre-syncing"
+
+        elif stage == "exec_hook":
+            if idx == len(provider.hooks):
+                return _real_run(idx=idx, stage="exec", ctx=ctx)
+            hook = provider.hooks[idx]
+            hook_before, hook_after = hook.before_exec, hook.after_exec
+            status = "syncing"
+
+        try:
+            # print("%s run before_%s, %d" % (provider.name, stage, idx))
+            hook_before(provider=provider, ctx=ctx)
+            status = _real_run(idx=idx+1, stage=stage, ctx=ctx)
+        except Exception:
+            traceback.print_exc()
+            status = "fail"
+        finally:
+            # print("%s run after_%s, %d" % (provider.name, stage, idx))
+            # job may break when syncing
+            if status != "success":
+                status = "fail"
+            try:
+                hook_after(provider=provider, status=status, ctx=ctx)
+            except Exception:
+                traceback.print_exc()
+
+        return status
+
     while 1:
     while 1:
         try:
         try:
             sema.acquire(True)
             sema.acquire(True)
@@ -43,43 +114,10 @@ def run_job(sema, child_q, manager_q, provider, **settings):
         manager_q.put(("UPDATE", (provider.name, status, ctx)))
         manager_q.put(("UPDATE", (provider.name, status, ctx)))
 
 
         try:
         try:
-            # before_job hooks
-            for hook in provider.hooks:
-                hook.before_job(provider=provider, ctx=ctx)
-
-            for retry in range(max_retry):
-                # before_exec hooks
-                for hook in provider.hooks:
-                    hook.before_exec(provider=provider, ctx=ctx)
-
-                status = "syncing"
-                manager_q.put(("UPDATE", (provider.name, status, ctx)))
-                print("start syncing {}, retry: {}".format(provider.name, retry))
-
-                try:
-                    provider.run(ctx=ctx)
-                    provider.wait()
-                except sh.ErrorReturnCode:
-                    status = "fail"
-                else:
-                    status = "success"
-
-                # after_exec hooks
-                for hook in provider.hooks:
-                    hook.after_exec(provider=provider, status=status, ctx=ctx)
-
-                if status == "success":
-                    break
-
-            # after_job hooks
-            for hook in provider.hooks[::-1]:
-                hook.after_job(provider=provider, status=status, ctx=ctx)
-
+            status = _real_run(idx=0, stage="job_hook", ctx=ctx)
         except Exception:
         except Exception:
-            import traceback
             traceback.print_exc()
             traceback.print_exc()
             status = "fail"
             status = "fail"
-
         finally:
         finally:
             sema.release()
             sema.release()
             aquired = False
             aquired = False
@@ -90,13 +128,8 @@ def run_job(sema, child_q, manager_q, provider, **settings):
 
 
         manager_q.put(("UPDATE", (provider.name, status, ctx)))
         manager_q.put(("UPDATE", (provider.name, status, ctx)))
 
 
-        try:
-            msg = child_q.get(timeout=provider.interval * 60)
-            if msg == "terminate":
-                manager_q.put(("CONFIG_ACK", (provider.name, "QUIT")))
-                break
-        except Queue.Empty:
-            pass
+        if sleep_wait(timeout=provider.interval * 60):
+            break
 
 
 
 
 # vim: ts=4 sw=4 sts=4 expandtab
 # vim: ts=4 sw=4 sts=4 expandtab