tunasync.py 8.9 KB


  1. #!/usr/bin/env python2
  2. # -*- coding:utf-8 -*-
  3. import signal
  4. import sys
  5. import toml
  6. from multiprocessing import Process, Semaphore, Queue
  7. from . import jobs
  8. from .hook import JobHook
  9. from .mirror_config import MirrorConfig
  10. from .status_manager import StatusManager
  11. from .clt_server import run_control_server
  12. class TUNASync(object):
  13. _instance = None
  14. _settings = None
  15. _inited = False
  16. def __new__(cls, *args, **kwargs):
  17. if not cls._instance:
  18. cls._instance = super(TUNASync, cls).__new__(cls, *args, **kwargs)
  19. return cls._instance
  20. def read_config(self, config_file):
  21. self._config_file = config_file
  22. with open(self._config_file) as f:
  23. self._settings = toml.loads(f.read())
  24. self._inited = True
  25. self._mirrors = {}
  26. self._providers = {}
  27. self.processes = {}
  28. self.semaphore = Semaphore(self._settings["global"]["concurrent"])
  29. self.channel = Queue()
  30. self._hooks = []
  31. self.mirror_root = self._settings["global"]["mirror_root"]
  32. self.use_btrfs = self._settings["global"]["use_btrfs"]
  33. self.btrfs_service_dir_tmpl = self._settings["btrfs"]["service_dir"]
  34. self.btrfs_working_dir_tmpl = self._settings["btrfs"]["working_dir"]
  35. self.btrfs_gc_dir_tmpl = self._settings["btrfs"]["gc_dir"]
  36. self.status_file = self._settings["global"]["status_file"]
  37. self.status_manager = StatusManager(self, self.status_file)
  38. self.ctrl_addr = self._settings["global"]["ctrl_addr"]
  39. self.ctrl_channel = Queue()
  40. p = Process(
  41. target=run_control_server,
  42. args=(self.ctrl_addr, self.channel, self.ctrl_channel),
  43. )
  44. p.start()
  45. self.processes["CTRL_SERVER"] = (self.ctrl_channel, p)
  46. def add_hook(self, h):
  47. assert isinstance(h, JobHook)
  48. self._hooks.append(h)
  49. def hooks(self):
  50. return self._hooks
  51. @property
  52. def mirrors(self):
  53. if self._mirrors:
  54. return self._mirrors
  55. for mirror_opt in self._settings["mirrors"]:
  56. name = mirror_opt["name"]
  57. self._mirrors[name] = \
  58. MirrorConfig(self, mirror_opt)
  59. return self._mirrors
  60. @property
  61. def providers(self):
  62. if self._providers:
  63. return self._providers
  64. for name, mirror in self.mirrors.iteritems():
  65. hooks = mirror.hooks() + self.hooks()
  66. provider = mirror.to_provider(hooks, no_delay=mirror.no_delay)
  67. self._providers[name] = provider
  68. return self._providers
  69. def run_jobs(self):
  70. for name in self.providers:
  71. self.run_provider(name)
  72. def sig_handler(*args):
  73. print("terminate subprocesses")
  74. for _, np in self.processes.iteritems():
  75. _, p = np
  76. p.terminate()
  77. print("Good Bye")
  78. sys.exit(0)
  79. signal.signal(signal.SIGINT, sig_handler)
  80. signal.signal(signal.SIGTERM, sig_handler)
  81. signal.signal(signal.SIGUSR1, self.reload_mirrors)
  82. signal.signal(signal.SIGUSR2, self.reload_mirrors_force)
  83. self.run_forever()
  84. def run_provider(self, name):
  85. if name not in self.providers:
  86. print("{} doesnot exist".format(name))
  87. return
  88. provider = self.providers[name]
  89. child_queue = Queue()
  90. p = Process(
  91. target=jobs.run_job,
  92. args=(self.semaphore, child_queue, self.channel, provider, ),
  93. kwargs={
  94. 'max_retry': self._settings['global']['max_retry']}
  95. )
  96. p.start()
  97. provider.set_delay(0) # clear delay after first start
  98. self.processes[name] = (child_queue, p)
  99. def reload_mirrors(self, signum, frame):
  100. try:
  101. return self._reload_mirrors(signum, frame, force=False)
  102. except Exception as e:
  103. print(e)
  104. def reload_mirrors_force(self, signum, frame):
  105. try:
  106. return self._reload_mirrors(signum, frame, force=True)
  107. except Exception as e:
  108. print(e)
  109. def _reload_mirrors(self, signum, frame, force=False):
  110. print("reload mirror configs, force restart: {}".format(force))
  111. with open(self._config_file) as f:
  112. self._settings = toml.loads(f.read())
  113. for mirror_opt in self._settings["mirrors"]:
  114. name = mirror_opt["name"]
  115. newMirCfg = MirrorConfig(self, mirror_opt)
  116. if name in self._mirrors:
  117. if newMirCfg.compare(self._mirrors[name]):
  118. continue
  119. self._mirrors[name] = newMirCfg
  120. hooks = newMirCfg.hooks() + self.hooks()
  121. newProvider = newMirCfg.to_provider(hooks, no_delay=True)
  122. self._providers[name] = newProvider
  123. if name in self.processes:
  124. q, p = self.processes[name]
  125. if force:
  126. p.terminate()
  127. print("Terminated Job: {}".format(name))
  128. self.run_provider(name)
  129. else:
  130. q.put("terminate")
  131. print("New configuration queued to {}".format(name))
  132. else:
  133. print("New mirror: {}".format(name))
  134. self.run_provider(name)
  135. self.status_manager.refresh_mirror(name)
  136. def run_forever(self):
  137. while 1:
  138. try:
  139. msg_hdr, msg_body = self.channel.get()
  140. except IOError:
  141. continue
  142. if msg_hdr == "UPDATE":
  143. mirror_name, status, ctx = msg_body
  144. try:
  145. self.status_manager.update_status(
  146. mirror_name, status, dict(ctx.items()))
  147. except Exception as e:
  148. print(e)
  149. elif msg_hdr == "CONFIG_ACK":
  150. mirror_name, status = msg_body
  151. if status == "QUIT":
  152. print("New configuration applied to {}".format(mirror_name))
  153. self.run_provider(mirror_name)
  154. elif msg_hdr == "CMD":
  155. cmd, mirror_name, kwargs = msg_body
  156. if (mirror_name not in self.mirrors) and (mirror_name != "__ALL__"):
  157. self.ctrl_channel.put("Invalid target")
  158. continue
  159. res = self.handle_cmd(cmd, mirror_name, kwargs)
  160. self.ctrl_channel.put(res)
  161. def handle_cmd(self, cmd, mirror_name, kwargs):
  162. if cmd == "restart":
  163. if mirror_name not in self.providers:
  164. res = "Invalid job: {}".format(mirror_name)
  165. return res
  166. if mirror_name in self.processes:
  167. _, p = self.processes[mirror_name]
  168. p.terminate()
  169. self.providers[mirror_name].set_delay(0)
  170. self.run_provider(mirror_name)
  171. res = "Restarted Job: {}".format(mirror_name)
  172. elif cmd == "stop":
  173. if mirror_name not in self.processes:
  174. res = "{} not running".format(mirror_name)
  175. return res
  176. _, p = self.processes.pop(mirror_name)
  177. p.terminate()
  178. res = "Stopped Job: {}".format(mirror_name)
  179. elif cmd == "start":
  180. if mirror_name in self.processes:
  181. res = "{} already running".format(mirror_name)
  182. return res
  183. self.run_provider(mirror_name)
  184. res = "Started Job: {}".format(mirror_name)
  185. elif cmd == "status":
  186. if mirror_name == "__ALL__":
  187. res = self.status_manager.list_status(_format=True)
  188. else:
  189. res = self.status_manager.get_status(mirror_name, _format=True)
  190. elif cmd == "log":
  191. job_ctx = self.status_manager.get_info(mirror_name, "ctx")
  192. n = kwargs.get("n", 0)
  193. if n == 0:
  194. res = job_ctx.get(
  195. "log_link",
  196. job_ctx.get("log_file", "/dev/null"),
  197. )
  198. else:
  199. import os
  200. log_file = job_ctx.get("log_file", None)
  201. if log_file is None:
  202. return "/dev/null"
  203. log_dir = os.path.dirname(log_file)
  204. lfiles = [
  205. os.path.join(log_dir, lfile)
  206. for lfile in os.listdir(log_dir)
  207. if lfile.startswith(mirror_name) and lfile != "latest"
  208. ]
  209. if len(lfiles) <= n:
  210. res = "Only {} log files available".format(len(lfiles))
  211. return res
  212. lfiles_set = set(lfiles)
  213. # sort to get the newest 10 files
  214. lfiles_ts = sorted(
  215. [(os.path.getmtime(lfile), lfile) for lfile in lfiles_set],
  216. key=lambda x: x[0],
  217. reverse=True,
  218. )
  219. return lfiles_ts[n][1]
  220. else:
  221. res = "Invalid command"
  222. return res
  223. # vim: ts=4 sw=4 sts=4 expandtab