job_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. package worker
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "os"
  6. "path/filepath"
  7. "testing"
  8. "time"
  9. . "github.com/smartystreets/goconvey/convey"
  10. . "github.com/tuna/tunasync/internal"
  11. )
  12. func TestMirrorJob(t *testing.T) {
  13. InitLogger(true, true, false)
  14. Convey("MirrorJob should work", t, func(ctx C) {
  15. tmpDir, err := ioutil.TempDir("", "tunasync")
  16. defer os.RemoveAll(tmpDir)
  17. So(err, ShouldBeNil)
  18. scriptFile := filepath.Join(tmpDir, "cmd.sh")
  19. tmpFile := filepath.Join(tmpDir, "log_file")
  20. c := cmdConfig{
  21. name: "tuna-cmd-jobtest",
  22. upstreamURL: "http://mirrors.tuna.moe/",
  23. command: "bash " + scriptFile,
  24. workingDir: tmpDir,
  25. logDir: tmpDir,
  26. logFile: tmpFile,
  27. interval: 1 * time.Second,
  28. }
  29. provider, err := newCmdProvider(c)
  30. So(err, ShouldBeNil)
  31. So(provider.Name(), ShouldEqual, c.name)
  32. So(provider.WorkingDir(), ShouldEqual, c.workingDir)
  33. So(provider.LogDir(), ShouldEqual, c.logDir)
  34. So(provider.LogFile(), ShouldEqual, c.logFile)
  35. So(provider.Interval(), ShouldEqual, c.interval)
  36. Convey("For a normal mirror job", func(ctx C) {
  37. scriptContent := `#!/bin/bash
  38. echo $TUNASYNC_WORKING_DIR
  39. echo $TUNASYNC_MIRROR_NAME
  40. echo $TUNASYNC_UPSTREAM_URL
  41. echo $TUNASYNC_LOG_FILE
  42. `
  43. expectedOutput := fmt.Sprintf(
  44. "%s\n%s\n%s\n%s\n",
  45. provider.WorkingDir(),
  46. provider.Name(),
  47. provider.upstreamURL,
  48. provider.LogFile(),
  49. )
  50. err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
  51. So(err, ShouldBeNil)
  52. readedScriptContent, err := ioutil.ReadFile(scriptFile)
  53. So(err, ShouldBeNil)
  54. So(readedScriptContent, ShouldResemble, []byte(scriptContent))
  55. Convey("If we let it run several times", func(ctx C) {
  56. managerChan := make(chan jobMessage, 10)
  57. semaphore := make(chan empty, 1)
  58. job := newMirrorJob(provider)
  59. go job.Run(managerChan, semaphore)
  60. // job should not start if we don't start it
  61. select {
  62. case <-managerChan:
  63. So(0, ShouldEqual, 1) // made this fail
  64. case <-time.After(1 * time.Second):
  65. So(0, ShouldEqual, 0)
  66. }
  67. job.ctrlChan <- jobStart
  68. for i := 0; i < 2; i++ {
  69. msg := <-managerChan
  70. So(msg.status, ShouldEqual, PreSyncing)
  71. msg = <-managerChan
  72. So(msg.status, ShouldEqual, Syncing)
  73. msg = <-managerChan
  74. So(msg.status, ShouldEqual, Success)
  75. loggedContent, err := ioutil.ReadFile(provider.LogFile())
  76. So(err, ShouldBeNil)
  77. So(string(loggedContent), ShouldEqual, expectedOutput)
  78. job.ctrlChan <- jobStart
  79. }
  80. select {
  81. case msg := <-managerChan:
  82. So(msg.status, ShouldEqual, PreSyncing)
  83. msg = <-managerChan
  84. So(msg.status, ShouldEqual, Syncing)
  85. msg = <-managerChan
  86. So(msg.status, ShouldEqual, Success)
  87. case <-time.After(2 * time.Second):
  88. So(0, ShouldEqual, 1)
  89. }
  90. job.ctrlChan <- jobDisable
  91. select {
  92. case <-managerChan:
  93. So(0, ShouldEqual, 1) // made this fail
  94. case <-job.disabled:
  95. So(0, ShouldEqual, 0)
  96. }
  97. })
  98. })
  99. Convey("When running long jobs with post-fail hook", func(ctx C) {
  100. scriptContent := `#!/bin/bash
  101. echo '++++++'
  102. echo $TUNASYNC_WORKING_DIR
  103. echo $0 sleeping
  104. sleep 3
  105. echo $TUNASYNC_WORKING_DIR
  106. echo '------'
  107. `
  108. err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
  109. So(err, ShouldBeNil)
  110. hookScriptFile := filepath.Join(tmpDir, "hook.sh")
  111. err = ioutil.WriteFile(hookScriptFile, []byte(scriptContent), 0755)
  112. So(err, ShouldBeNil)
  113. h, err := newExecPostHook(provider, execOnFailure, hookScriptFile)
  114. So(err, ShouldBeNil)
  115. provider.AddHook(h)
  116. managerChan := make(chan jobMessage, 10)
  117. semaphore := make(chan empty, 1)
  118. job := newMirrorJob(provider)
  119. Convey("If we kill it", func(ctx C) {
  120. go job.Run(managerChan, semaphore)
  121. job.ctrlChan <- jobStart
  122. time.Sleep(1 * time.Second)
  123. msg := <-managerChan
  124. So(msg.status, ShouldEqual, PreSyncing)
  125. msg = <-managerChan
  126. So(msg.status, ShouldEqual, Syncing)
  127. job.ctrlChan <- jobStop
  128. msg = <-managerChan
  129. So(msg.status, ShouldEqual, Failed)
  130. job.ctrlChan <- jobDisable
  131. <-job.disabled
  132. })
  133. Convey("If we kill it then start it", func(ctx C) {
  134. go job.Run(managerChan, semaphore)
  135. job.ctrlChan <- jobStart
  136. time.Sleep(1 * time.Second)
  137. msg := <-managerChan
  138. So(msg.status, ShouldEqual, PreSyncing)
  139. msg = <-managerChan
  140. So(msg.status, ShouldEqual, Syncing)
  141. job.ctrlChan <- jobStop
  142. time.Sleep(2 * time.Second)
  143. logger.Debugf("Now starting...\n")
  144. job.ctrlChan <- jobStart
  145. msg = <-managerChan
  146. So(msg.status, ShouldEqual, Failed)
  147. job.ctrlChan <- jobDisable
  148. <-job.disabled
  149. })
  150. })
  151. Convey("When running long jobs", func(ctx C) {
  152. scriptContent := `#!/bin/bash
  153. echo $TUNASYNC_WORKING_DIR
  154. sleep 5
  155. echo $TUNASYNC_WORKING_DIR
  156. `
  157. err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
  158. So(err, ShouldBeNil)
  159. managerChan := make(chan jobMessage, 10)
  160. semaphore := make(chan empty, 1)
  161. job := newMirrorJob(provider)
  162. Convey("If we kill it", func(ctx C) {
  163. go job.Run(managerChan, semaphore)
  164. job.ctrlChan <- jobStart
  165. time.Sleep(1 * time.Second)
  166. msg := <-managerChan
  167. So(msg.status, ShouldEqual, PreSyncing)
  168. msg = <-managerChan
  169. So(msg.status, ShouldEqual, Syncing)
  170. job.ctrlChan <- jobStart // should be ignored
  171. job.ctrlChan <- jobStop
  172. msg = <-managerChan
  173. So(msg.status, ShouldEqual, Failed)
  174. expectedOutput := fmt.Sprintf("%s\n", provider.WorkingDir())
  175. loggedContent, err := ioutil.ReadFile(provider.LogFile())
  176. So(err, ShouldBeNil)
  177. So(string(loggedContent), ShouldEqual, expectedOutput)
  178. job.ctrlChan <- jobDisable
  179. <-job.disabled
  180. })
  181. Convey("If we don't kill it", func(ctx C) {
  182. go job.Run(managerChan, semaphore)
  183. job.ctrlChan <- jobStart
  184. msg := <-managerChan
  185. So(msg.status, ShouldEqual, PreSyncing)
  186. msg = <-managerChan
  187. So(msg.status, ShouldEqual, Syncing)
  188. msg = <-managerChan
  189. So(msg.status, ShouldEqual, Success)
  190. expectedOutput := fmt.Sprintf(
  191. "%s\n%s\n",
  192. provider.WorkingDir(), provider.WorkingDir(),
  193. )
  194. loggedContent, err := ioutil.ReadFile(provider.LogFile())
  195. So(err, ShouldBeNil)
  196. So(string(loggedContent), ShouldEqual, expectedOutput)
  197. job.ctrlChan <- jobDisable
  198. <-job.disabled
  199. })
  200. Convey("If we restart it", func(ctx C) {
  201. go job.Run(managerChan, semaphore)
  202. job.ctrlChan <- jobStart
  203. msg := <-managerChan
  204. So(msg.status, ShouldEqual, PreSyncing)
  205. msg = <-managerChan
  206. So(msg.status, ShouldEqual, Syncing)
  207. job.ctrlChan <- jobRestart
  208. msg = <-managerChan
  209. So(msg.status, ShouldEqual, Failed)
  210. So(msg.msg, ShouldEqual, "killed by manager")
  211. msg = <-managerChan
  212. So(msg.status, ShouldEqual, PreSyncing)
  213. msg = <-managerChan
  214. So(msg.status, ShouldEqual, Syncing)
  215. msg = <-managerChan
  216. So(msg.status, ShouldEqual, Success)
  217. expectedOutput := fmt.Sprintf(
  218. "%s\n%s\n",
  219. provider.WorkingDir(), provider.WorkingDir(),
  220. )
  221. loggedContent, err := ioutil.ReadFile(provider.LogFile())
  222. So(err, ShouldBeNil)
  223. So(string(loggedContent), ShouldEqual, expectedOutput)
  224. job.ctrlChan <- jobDisable
  225. <-job.disabled
  226. })
  227. Convey("If we disable it", func(ctx C) {
  228. go job.Run(managerChan, semaphore)
  229. job.ctrlChan <- jobStart
  230. msg := <-managerChan
  231. So(msg.status, ShouldEqual, PreSyncing)
  232. msg = <-managerChan
  233. So(msg.status, ShouldEqual, Syncing)
  234. job.ctrlChan <- jobDisable
  235. msg = <-managerChan
  236. So(msg.status, ShouldEqual, Failed)
  237. So(msg.msg, ShouldEqual, "killed by manager")
  238. <-job.disabled
  239. })
  240. Convey("If we stop it twice, than start it", func(ctx C) {
  241. go job.Run(managerChan, semaphore)
  242. job.ctrlChan <- jobStart
  243. msg := <-managerChan
  244. So(msg.status, ShouldEqual, PreSyncing)
  245. msg = <-managerChan
  246. So(msg.status, ShouldEqual, Syncing)
  247. job.ctrlChan <- jobStop
  248. msg = <-managerChan
  249. So(msg.status, ShouldEqual, Failed)
  250. So(msg.msg, ShouldEqual, "killed by manager")
  251. job.ctrlChan <- jobStop // should be ignored
  252. job.ctrlChan <- jobStart
  253. msg = <-managerChan
  254. So(msg.status, ShouldEqual, PreSyncing)
  255. msg = <-managerChan
  256. So(msg.status, ShouldEqual, Syncing)
  257. msg = <-managerChan
  258. So(msg.status, ShouldEqual, Success)
  259. expectedOutput := fmt.Sprintf(
  260. "%s\n%s\n",
  261. provider.WorkingDir(), provider.WorkingDir(),
  262. )
  263. loggedContent, err := ioutil.ReadFile(provider.LogFile())
  264. So(err, ShouldBeNil)
  265. So(string(loggedContent), ShouldEqual, expectedOutput)
  266. job.ctrlChan <- jobDisable
  267. <-job.disabled
  268. })
  269. })
  270. })
  271. }
  272. func TestConcurrentMirrorJobs(t *testing.T) {
  273. InitLogger(true, true, false)
  274. Convey("Concurrent MirrorJobs should work", t, func(ctx C) {
  275. tmpDir, err := ioutil.TempDir("", "tunasync")
  276. defer os.RemoveAll(tmpDir)
  277. So(err, ShouldBeNil)
  278. const CONCURRENT = 5
  279. var providers [CONCURRENT]*cmdProvider
  280. var jobs [CONCURRENT]*mirrorJob
  281. for i := 0; i < CONCURRENT; i++ {
  282. c := cmdConfig{
  283. name: fmt.Sprintf("job-%d", i),
  284. upstreamURL: "http://mirrors.tuna.moe/",
  285. command: "sleep 2",
  286. workingDir: tmpDir,
  287. logDir: tmpDir,
  288. logFile: "/dev/null",
  289. interval: 10 * time.Second,
  290. }
  291. var err error
  292. providers[i], err = newCmdProvider(c)
  293. So(err, ShouldBeNil)
  294. jobs[i] = newMirrorJob(providers[i])
  295. }
  296. managerChan := make(chan jobMessage, 10)
  297. semaphore := make(chan empty, CONCURRENT-2)
  298. countingJobs := func(managerChan chan jobMessage, totalJobs, concurrentCheck int) (peakConcurrent, counterFailed int) {
  299. counterEnded := 0
  300. counterRunning := 0
  301. peakConcurrent = 0
  302. counterFailed = 0
  303. for counterEnded < totalJobs {
  304. msg := <-managerChan
  305. switch msg.status {
  306. case PreSyncing:
  307. counterRunning++
  308. case Syncing:
  309. case Failed:
  310. counterFailed++
  311. fallthrough
  312. case Success:
  313. counterEnded++
  314. counterRunning--
  315. default:
  316. So(0, ShouldEqual, 1)
  317. }
  318. // Test if semaphore works
  319. So(counterRunning, ShouldBeLessThanOrEqualTo, concurrentCheck)
  320. if counterRunning > peakConcurrent {
  321. peakConcurrent = counterRunning
  322. }
  323. }
  324. // select {
  325. // case msg := <-managerChan:
  326. // logger.Errorf("extra message received: %v", msg)
  327. // So(0, ShouldEqual, 1)
  328. // case <-time.After(2 * time.Second):
  329. // }
  330. return
  331. }
  332. Convey("When we run them all", func(ctx C) {
  333. for _, job := range jobs {
  334. go job.Run(managerChan, semaphore)
  335. job.ctrlChan <- jobStart
  336. }
  337. peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
  338. So(peakConcurrent, ShouldEqual, CONCURRENT-2)
  339. So(counterFailed, ShouldEqual, 0)
  340. for _, job := range jobs {
  341. job.ctrlChan <- jobDisable
  342. <-job.disabled
  343. }
  344. })
  345. Convey("If we cancel one job", func(ctx C) {
  346. for _, job := range jobs {
  347. go job.Run(managerChan, semaphore)
  348. job.ctrlChan <- jobRestart
  349. time.Sleep(200 * time.Millisecond)
  350. }
  351. // Cancel the one waiting for semaphore
  352. jobs[len(jobs)-1].ctrlChan <- jobStop
  353. peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT-1, CONCURRENT-2)
  354. So(peakConcurrent, ShouldEqual, CONCURRENT-2)
  355. So(counterFailed, ShouldEqual, 0)
  356. for _, job := range jobs {
  357. job.ctrlChan <- jobDisable
  358. <-job.disabled
  359. }
  360. })
  361. Convey("If we override the concurrent limit", func(ctx C) {
  362. for _, job := range jobs {
  363. go job.Run(managerChan, semaphore)
  364. job.ctrlChan <- jobStart
  365. time.Sleep(200 * time.Millisecond)
  366. }
  367. jobs[len(jobs)-1].ctrlChan <- jobForceStart
  368. jobs[len(jobs)-2].ctrlChan <- jobForceStart
  369. peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT)
  370. So(peakConcurrent, ShouldEqual, CONCURRENT)
  371. So(counterFailed, ShouldEqual, 0)
  372. time.Sleep(1 * time.Second)
  373. // fmt.Println("Restart them")
  374. for _, job := range jobs {
  375. job.ctrlChan <- jobStart
  376. }
  377. peakConcurrent, counterFailed = countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
  378. So(peakConcurrent, ShouldEqual, CONCURRENT-2)
  379. So(counterFailed, ShouldEqual, 0)
  380. for _, job := range jobs {
  381. job.ctrlChan <- jobDisable
  382. <-job.disabled
  383. }
  384. })
  385. })
  386. }