2
0

jobs.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. #!/usr/bin/env python2
  2. # -*- coding:utf-8 -*-
  3. import sh
  4. import sys
  5. from setproctitle import setproctitle
  6. import signal
  7. import Queue
  8. import traceback
  9. def run_job(sema, child_q, manager_q, provider, **settings):
  10. aquired = False
  11. setproctitle("tunasync-{}".format(provider.name))
  12. def before_quit(*args):
  13. provider.terminate()
  14. if aquired:
  15. print("{} release semaphore".format(provider.name))
  16. sema.release()
  17. sys.exit(0)
  18. def sleep_wait(timeout):
  19. try:
  20. msg = child_q.get(timeout=timeout)
  21. if msg == "terminate":
  22. manager_q.put(("CONFIG_ACK", (provider.name, "QUIT")))
  23. return True
  24. except Queue.Empty:
  25. return False
  26. signal.signal(signal.SIGTERM, before_quit)
  27. if provider.delay > 0:
  28. if sleep_wait(provider.delay):
  29. return
  30. max_retry = settings.get("max_retry", 1)
  31. def _real_run(idx=0, stage="job_hook", ctx=None):
  32. """\
  33. 4 stages:
  34. 0 -> job_hook, 1 -> set_retry, 2 -> exec_hook, 3 -> exec
  35. """
  36. assert(ctx is not None)
  37. if stage == "exec":
  38. # exec_job
  39. try:
  40. provider.run(ctx=ctx)
  41. provider.wait()
  42. except sh.ErrorReturnCode:
  43. status = "fail"
  44. else:
  45. status = "success"
  46. return status
  47. elif stage == "set_retry":
  48. # enter stage 3 with retry
  49. for retry in range(max_retry):
  50. status = "syncing"
  51. manager_q.put(("UPDATE", (provider.name, status, ctx)))
  52. print("start syncing {}, retry: {}".format(provider.name, retry))
  53. status = _real_run(idx=0, stage="exec_hook", ctx=ctx)
  54. if status == "success":
  55. break
  56. return status
  57. # job_hooks
  58. elif stage == "job_hook":
  59. if idx == len(provider.hooks):
  60. return _real_run(idx=idx, stage="set_retry", ctx=ctx)
  61. hook = provider.hooks[idx]
  62. hook_before, hook_after = hook.before_job, hook.after_job
  63. status = "pre-syncing"
  64. elif stage == "exec_hook":
  65. if idx == len(provider.hooks):
  66. return _real_run(idx=idx, stage="exec", ctx=ctx)
  67. hook = provider.hooks[idx]
  68. hook_before, hook_after = hook.before_exec, hook.after_exec
  69. status = "syncing"
  70. try:
  71. # print("%s run before_%s, %d" % (provider.name, stage, idx))
  72. hook_before(provider=provider, ctx=ctx)
  73. status = _real_run(idx=idx+1, stage=stage, ctx=ctx)
  74. except Exception:
  75. traceback.print_exc()
  76. status = "fail"
  77. finally:
  78. # print("%s run after_%s, %d" % (provider.name, stage, idx))
  79. # job may break when syncing
  80. if status != "success":
  81. status = "fail"
  82. try:
  83. hook_after(provider=provider, status=status, ctx=ctx)
  84. except Exception:
  85. traceback.print_exc()
  86. return status
  87. while 1:
  88. try:
  89. sema.acquire(True)
  90. except:
  91. break
  92. aquired = True
  93. ctx = {} # put context info in it
  94. ctx['current_dir'] = provider.local_dir
  95. ctx['mirror_name'] = provider.name
  96. status = "pre-syncing"
  97. manager_q.put(("UPDATE", (provider.name, status, ctx)))
  98. try:
  99. status = _real_run(idx=0, stage="job_hook", ctx=ctx)
  100. except Exception:
  101. traceback.print_exc()
  102. status = "fail"
  103. finally:
  104. sema.release()
  105. aquired = False
  106. print("syncing {} finished, sleep {} minutes for the next turn".format(
  107. provider.name, provider.interval
  108. ))
  109. manager_q.put(("UPDATE", (provider.name, status, ctx)))
  110. if sleep_wait(timeout=provider.interval * 60):
  111. break
  112. # vim: ts=4 sw=4 sts=4 expandtab