mirror_provider.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. #!/usr/bin/env python2
  2. # -*- coding:utf-8 -*-
  3. import sh
  4. import os
  5. import shlex
  6. from datetime import datetime
  7. class MirrorProvider(object):
  8. '''
  9. Mirror method class, can be `rsync', `debmirror', etc.
  10. '''
  11. def __init__(self, name, local_dir, log_dir, log_file="/dev/null",
  12. interval=120, hooks=[]):
  13. self.name = name
  14. self.local_dir = local_dir
  15. self.log_file = log_file
  16. self.log_dir = log_dir
  17. self.interval = interval
  18. self.hooks = hooks
  19. self.p = None
  20. self.delay = 0
  21. # deprecated
  22. def ensure_log_dir(self):
  23. log_dir = os.path.dirname(self.log_file)
  24. if not os.path.exists(log_dir):
  25. sh.mkdir("-p", log_dir)
  26. def get_log_file(self, ctx={}):
  27. if 'log_file' in ctx:
  28. log_file = ctx['log_file']
  29. else:
  30. now = datetime.now().strftime("%Y-%m-%d_%H")
  31. log_file = self.log_file.format(date=now)
  32. ctx['log_file'] = log_file
  33. return log_file
  34. def set_delay(self, sec):
  35. ''' Set start delay '''
  36. self.delay = sec
  37. def run(self, ctx={}):
  38. raise NotImplementedError("run method should be implemented")
  39. def terminate(self):
  40. if self.p is not None:
  41. self.p.process.terminate()
  42. print("{} terminated".format(self.name))
  43. self.p = None
  44. def wait(self):
  45. if self.p is not None:
  46. self.p.wait()
  47. self.p = None
  48. class RsyncProvider(MirrorProvider):
  49. _default_options = ['-aHvh', '--no-o', '--no-g', '--stats',
  50. '--exclude', '.~tmp~/',
  51. '--delete', '--delete-after', '--delay-updates',
  52. '--safe-links', '--timeout=120', '--contimeout=120']
  53. def __init__(self, name, upstream_url, local_dir, log_dir,
  54. useIPv6=True, password=None, exclude_file=None,
  55. log_file="/dev/null", interval=120, env=None, hooks=[]):
  56. super(RsyncProvider, self).__init__(name, local_dir, log_dir, log_file,
  57. interval, hooks)
  58. self.upstream_url = upstream_url
  59. self.useIPv6 = useIPv6
  60. self.exclude_file = exclude_file
  61. self.password = password
  62. self.env = env
  63. @property
  64. def options(self):
  65. _options = [o for o in self._default_options] # copy
  66. if self.useIPv6:
  67. _options.append("-6")
  68. if self.exclude_file:
  69. _options.append("--exclude-from")
  70. _options.append(self.exclude_file)
  71. return _options
  72. def run(self, ctx={}):
  73. _args = self.options
  74. _args.append(self.upstream_url)
  75. working_dir = ctx.get("current_dir", self.local_dir)
  76. _args.append(working_dir)
  77. log_file = self.get_log_file(ctx)
  78. new_env = os.environ.copy()
  79. if self.password is not None:
  80. new_env["RSYNC_PASSWORD"] = self.password
  81. if self.env is not None and isinstance(self.env, dict):
  82. for k, v in self.env.items():
  83. new_env[k] = v
  84. self.p = sh.rsync(*_args, _env=new_env, _out=log_file,
  85. _err_to_out=True, _out_bufsize=1, _bg=True)
  86. class TwoStageRsyncProvider(RsyncProvider):
  87. _stage1_options = ['-aHvh', '--no-o', '--no-g',
  88. '--exclude', '.~tmp~/',
  89. '--safe-links', '--timeout=120', '--contimeout=120']
  90. _stage2_options = ['-aHvh', '--no-o', '--no-g', '--stats',
  91. '--exclude', '.~tmp~/',
  92. '--delete', '--delete-after', '--delay-updates',
  93. '--safe-links', '--timeout=120', '--contimeout=120']
  94. _stage1_profiles = {
  95. "debian": [
  96. 'Packages*', 'Sources*', 'Release*',
  97. 'InRelease', 'i18n/*', 'ls-lR*', 'dep11/*',
  98. ]
  99. }
  100. def set_stage1_profile(self, profile):
  101. if profile not in self._stage1_profiles:
  102. raise Exception("Profile Undefined: %s, %s" % (profile, self.name))
  103. self._stage1_excludes = self._stage1_profiles[profile]
  104. def options(self, stage):
  105. _default_options = self._stage1_options \
  106. if stage == 1 else self._stage2_options
  107. _options = [o for o in _default_options] # copy
  108. if stage == 1:
  109. for _exc in self._stage1_excludes:
  110. _options.append("--exclude")
  111. _options.append(_exc)
  112. if self.useIPv6:
  113. _options.append("-6")
  114. if self.exclude_file:
  115. _options.append("--exclude-from")
  116. _options.append(self.exclude_file)
  117. return _options
  118. def run(self, ctx={}):
  119. working_dir = ctx.get("current_dir", self.local_dir)
  120. log_file = self.get_log_file(ctx)
  121. new_env = os.environ.copy()
  122. if self.password is not None:
  123. new_env["RSYNC_PASSWORD"] = self.password
  124. if self.env is not None and isinstance(self.env, dict):
  125. for k, v in self.env.items():
  126. new_env[k] = v
  127. with open(log_file, 'w', buffering=1) as f:
  128. def log_output(line):
  129. f.write(line)
  130. for stage in (1, 2):
  131. _args = self.options(stage)
  132. _args.append(self.upstream_url)
  133. _args.append(working_dir)
  134. f.write("==== Stage {} Begins ====\n\n".format(stage))
  135. self.p = sh.rsync(
  136. *_args, _env=new_env, _out=log_output,
  137. _err_to_out=True, _out_bufsize=1, _bg=False
  138. )
  139. self.p.wait()
  140. class ShellProvider(MirrorProvider):
  141. def __init__(self, name, command, upstream_url, local_dir, log_dir,
  142. log_file="/dev/null", log_stdout=True, interval=120, env=None,
  143. hooks=[]):
  144. super(ShellProvider, self).__init__(name, local_dir, log_dir, log_file,
  145. interval, hooks)
  146. self.upstream_url = str(upstream_url)
  147. self.command = shlex.split(command)
  148. self.log_stdout = log_stdout
  149. self.env = env
  150. def run(self, ctx={}):
  151. log_file = self.get_log_file(ctx)
  152. new_env = os.environ.copy()
  153. new_env["TUNASYNC_MIRROR_NAME"] = self.name
  154. new_env["TUNASYNC_LOCAL_DIR"] = self.local_dir
  155. new_env["TUNASYNC_WORKING_DIR"] = ctx.get("current_dir", self.local_dir)
  156. new_env["TUNASYNC_UPSTREAM_URL"] = self.upstream_url
  157. new_env["TUNASYNC_LOG_FILE"] = log_file
  158. if self.env is not None and isinstance(self.env, dict):
  159. for k, v in self.env.items():
  160. new_env[k] = v
  161. _cmd = self.command[0]
  162. _args = [] if len(self.command) == 1 else self.command[1:]
  163. cmd = sh.Command(_cmd)
  164. if self.log_stdout:
  165. self.p = cmd(*_args, _env=new_env, _out=log_file,
  166. _err_to_out=True, _out_bufsize=1, _bg=True)
  167. else:
  168. self.p = cmd(*_args, _env=new_env, _out='/dev/null',
  169. _err='/dev/null', _out_bufsize=1, _bg=True)
  170. # vim: ts=4 sw=4 sts=4 expandtab