2
0

job_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. package worker
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "os"
  6. "path/filepath"
  7. "testing"
  8. "time"
  9. . "github.com/smartystreets/goconvey/convey"
  10. . "github.com/tuna/tunasync/internal"
  11. )
  12. func TestMirrorJob(t *testing.T) {
  13. InitLogger(true, true, false)
  14. Convey("MirrorJob should work", t, func(ctx C) {
  15. tmpDir, err := ioutil.TempDir("", "tunasync")
  16. defer os.RemoveAll(tmpDir)
  17. So(err, ShouldBeNil)
  18. scriptFile := filepath.Join(tmpDir, "cmd.sh")
  19. tmpFile := filepath.Join(tmpDir, "log_file")
  20. c := cmdConfig{
  21. name: "tuna-cmd-jobtest",
  22. upstreamURL: "http://mirrors.tuna.moe/",
  23. command: "bash " + scriptFile,
  24. workingDir: tmpDir,
  25. logDir: tmpDir,
  26. logFile: tmpFile,
  27. interval: 1 * time.Second,
  28. timeout: 7 * time.Second,
  29. }
  30. provider, err := newCmdProvider(c)
  31. So(err, ShouldBeNil)
  32. So(provider.Name(), ShouldEqual, c.name)
  33. So(provider.WorkingDir(), ShouldEqual, c.workingDir)
  34. So(provider.LogDir(), ShouldEqual, c.logDir)
  35. So(provider.LogFile(), ShouldEqual, c.logFile)
  36. So(provider.Interval(), ShouldEqual, c.interval)
  37. So(provider.Timeout(), ShouldEqual, c.timeout)
  38. Convey("For a normal mirror job", func(ctx C) {
  39. scriptContent := `#!/bin/bash
  40. echo $TUNASYNC_WORKING_DIR
  41. echo $TUNASYNC_MIRROR_NAME
  42. echo $TUNASYNC_UPSTREAM_URL
  43. echo $TUNASYNC_LOG_FILE
  44. `
  45. expectedOutput := fmt.Sprintf(
  46. "%s\n%s\n%s\n%s\n",
  47. provider.WorkingDir(),
  48. provider.Name(),
  49. provider.upstreamURL,
  50. provider.LogFile(),
  51. )
  52. err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
  53. So(err, ShouldBeNil)
  54. readedScriptContent, err := ioutil.ReadFile(scriptFile)
  55. So(err, ShouldBeNil)
  56. So(readedScriptContent, ShouldResemble, []byte(scriptContent))
  57. Convey("If we let it run several times", func(ctx C) {
  58. managerChan := make(chan jobMessage, 10)
  59. semaphore := make(chan empty, 1)
  60. job := newMirrorJob(provider)
  61. go job.Run(managerChan, semaphore)
  62. // job should not start if we don't start it
  63. select {
  64. case <-managerChan:
  65. So(0, ShouldEqual, 1) // made this fail
  66. case <-time.After(1 * time.Second):
  67. So(0, ShouldEqual, 0)
  68. }
  69. job.ctrlChan <- jobStart
  70. for i := 0; i < 2; i++ {
  71. msg := <-managerChan
  72. So(msg.status, ShouldEqual, PreSyncing)
  73. msg = <-managerChan
  74. So(msg.status, ShouldEqual, Syncing)
  75. msg = <-managerChan
  76. So(msg.status, ShouldEqual, Success)
  77. loggedContent, err := ioutil.ReadFile(provider.LogFile())
  78. So(err, ShouldBeNil)
  79. So(string(loggedContent), ShouldEqual, expectedOutput)
  80. job.ctrlChan <- jobStart
  81. }
  82. select {
  83. case msg := <-managerChan:
  84. So(msg.status, ShouldEqual, PreSyncing)
  85. msg = <-managerChan
  86. So(msg.status, ShouldEqual, Syncing)
  87. msg = <-managerChan
  88. So(msg.status, ShouldEqual, Success)
  89. case <-time.After(2 * time.Second):
  90. So(0, ShouldEqual, 1)
  91. }
  92. job.ctrlChan <- jobDisable
  93. select {
  94. case <-managerChan:
  95. So(0, ShouldEqual, 1) // made this fail
  96. case <-job.disabled:
  97. So(0, ShouldEqual, 0)
  98. }
  99. })
  100. })
  101. Convey("When running long jobs with post-fail hook", func(ctx C) {
  102. scriptContent := `#!/bin/bash
  103. echo '++++++'
  104. echo $TUNASYNC_WORKING_DIR
  105. echo $0 sleeping
  106. sleep 3
  107. echo $TUNASYNC_WORKING_DIR
  108. echo '------'
  109. `
  110. err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
  111. So(err, ShouldBeNil)
  112. hookScriptFile := filepath.Join(tmpDir, "hook.sh")
  113. err = ioutil.WriteFile(hookScriptFile, []byte(scriptContent), 0755)
  114. So(err, ShouldBeNil)
  115. h, err := newExecPostHook(provider, execOnFailure, hookScriptFile)
  116. So(err, ShouldBeNil)
  117. provider.AddHook(h)
  118. managerChan := make(chan jobMessage, 10)
  119. semaphore := make(chan empty, 1)
  120. job := newMirrorJob(provider)
  121. Convey("If we kill it", func(ctx C) {
  122. go job.Run(managerChan, semaphore)
  123. job.ctrlChan <- jobStart
  124. time.Sleep(1 * time.Second)
  125. msg := <-managerChan
  126. So(msg.status, ShouldEqual, PreSyncing)
  127. msg = <-managerChan
  128. So(msg.status, ShouldEqual, Syncing)
  129. job.ctrlChan <- jobStop
  130. msg = <-managerChan
  131. So(msg.status, ShouldEqual, Failed)
  132. job.ctrlChan <- jobDisable
  133. <-job.disabled
  134. })
  135. Convey("If we kill it then start it", func(ctx C) {
  136. go job.Run(managerChan, semaphore)
  137. job.ctrlChan <- jobStart
  138. time.Sleep(1 * time.Second)
  139. msg := <-managerChan
  140. So(msg.status, ShouldEqual, PreSyncing)
  141. msg = <-managerChan
  142. So(msg.status, ShouldEqual, Syncing)
  143. job.ctrlChan <- jobStop
  144. time.Sleep(2 * time.Second)
  145. logger.Debugf("Now starting...\n")
  146. job.ctrlChan <- jobStart
  147. msg = <-managerChan
  148. So(msg.status, ShouldEqual, Failed)
  149. job.ctrlChan <- jobDisable
  150. <-job.disabled
  151. })
  152. })
  153. Convey("When running long jobs", func(ctx C) {
  154. scriptContent := `#!/bin/bash
  155. echo $TUNASYNC_WORKING_DIR
  156. sleep 5
  157. echo $TUNASYNC_WORKING_DIR
  158. `
  159. err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
  160. So(err, ShouldBeNil)
  161. managerChan := make(chan jobMessage, 10)
  162. semaphore := make(chan empty, 1)
  163. job := newMirrorJob(provider)
  164. Convey("If we kill it", func(ctx C) {
  165. go job.Run(managerChan, semaphore)
  166. job.ctrlChan <- jobStart
  167. time.Sleep(1 * time.Second)
  168. msg := <-managerChan
  169. So(msg.status, ShouldEqual, PreSyncing)
  170. msg = <-managerChan
  171. So(msg.status, ShouldEqual, Syncing)
  172. job.ctrlChan <- jobStart // should be ignored
  173. job.ctrlChan <- jobStop
  174. msg = <-managerChan
  175. So(msg.status, ShouldEqual, Failed)
  176. expectedOutput := fmt.Sprintf("%s\n", provider.WorkingDir())
  177. loggedContent, err := ioutil.ReadFile(provider.LogFile())
  178. So(err, ShouldBeNil)
  179. So(string(loggedContent), ShouldEqual, expectedOutput)
  180. job.ctrlChan <- jobDisable
  181. <-job.disabled
  182. })
  183. Convey("If we don't kill it", func(ctx C) {
  184. go job.Run(managerChan, semaphore)
  185. job.ctrlChan <- jobStart
  186. msg := <-managerChan
  187. So(msg.status, ShouldEqual, PreSyncing)
  188. msg = <-managerChan
  189. So(msg.status, ShouldEqual, Syncing)
  190. msg = <-managerChan
  191. So(msg.status, ShouldEqual, Success)
  192. expectedOutput := fmt.Sprintf(
  193. "%s\n%s\n",
  194. provider.WorkingDir(), provider.WorkingDir(),
  195. )
  196. loggedContent, err := ioutil.ReadFile(provider.LogFile())
  197. So(err, ShouldBeNil)
  198. So(string(loggedContent), ShouldEqual, expectedOutput)
  199. job.ctrlChan <- jobDisable
  200. <-job.disabled
  201. })
  202. Convey("If we restart it", func(ctx C) {
  203. go job.Run(managerChan, semaphore)
  204. job.ctrlChan <- jobStart
  205. msg := <-managerChan
  206. So(msg.status, ShouldEqual, PreSyncing)
  207. msg = <-managerChan
  208. So(msg.status, ShouldEqual, Syncing)
  209. job.ctrlChan <- jobRestart
  210. msg = <-managerChan
  211. So(msg.status, ShouldEqual, Failed)
  212. So(msg.msg, ShouldEqual, "killed by manager")
  213. msg = <-managerChan
  214. So(msg.status, ShouldEqual, PreSyncing)
  215. msg = <-managerChan
  216. So(msg.status, ShouldEqual, Syncing)
  217. msg = <-managerChan
  218. So(msg.status, ShouldEqual, Success)
  219. expectedOutput := fmt.Sprintf(
  220. "%s\n%s\n",
  221. provider.WorkingDir(), provider.WorkingDir(),
  222. )
  223. loggedContent, err := ioutil.ReadFile(provider.LogFile())
  224. So(err, ShouldBeNil)
  225. So(string(loggedContent), ShouldEqual, expectedOutput)
  226. job.ctrlChan <- jobDisable
  227. <-job.disabled
  228. })
  229. Convey("If we disable it", func(ctx C) {
  230. go job.Run(managerChan, semaphore)
  231. job.ctrlChan <- jobStart
  232. msg := <-managerChan
  233. So(msg.status, ShouldEqual, PreSyncing)
  234. msg = <-managerChan
  235. So(msg.status, ShouldEqual, Syncing)
  236. job.ctrlChan <- jobDisable
  237. msg = <-managerChan
  238. So(msg.status, ShouldEqual, Failed)
  239. So(msg.msg, ShouldEqual, "killed by manager")
  240. <-job.disabled
  241. })
  242. Convey("If we stop it twice, than start it", func(ctx C) {
  243. go job.Run(managerChan, semaphore)
  244. job.ctrlChan <- jobStart
  245. msg := <-managerChan
  246. So(msg.status, ShouldEqual, PreSyncing)
  247. msg = <-managerChan
  248. So(msg.status, ShouldEqual, Syncing)
  249. job.ctrlChan <- jobStop
  250. msg = <-managerChan
  251. So(msg.status, ShouldEqual, Failed)
  252. So(msg.msg, ShouldEqual, "killed by manager")
  253. job.ctrlChan <- jobStop // should be ignored
  254. job.ctrlChan <- jobStart
  255. msg = <-managerChan
  256. So(msg.status, ShouldEqual, PreSyncing)
  257. msg = <-managerChan
  258. So(msg.status, ShouldEqual, Syncing)
  259. msg = <-managerChan
  260. So(msg.status, ShouldEqual, Success)
  261. expectedOutput := fmt.Sprintf(
  262. "%s\n%s\n",
  263. provider.WorkingDir(), provider.WorkingDir(),
  264. )
  265. loggedContent, err := ioutil.ReadFile(provider.LogFile())
  266. So(err, ShouldBeNil)
  267. So(string(loggedContent), ShouldEqual, expectedOutput)
  268. job.ctrlChan <- jobDisable
  269. <-job.disabled
  270. })
  271. })
  272. Convey("When a job timed out", func(ctx C) {
  273. scriptContent := `#!/bin/bash
  274. echo $TUNASYNC_WORKING_DIR
  275. sleep 10
  276. echo $TUNASYNC_WORKING_DIR
  277. `
  278. err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
  279. So(err, ShouldBeNil)
  280. managerChan := make(chan jobMessage, 10)
  281. semaphore := make(chan empty, 1)
  282. job := newMirrorJob(provider)
  283. Convey("It should be automatically terminated", func(ctx C) {
  284. go job.Run(managerChan, semaphore)
  285. job.ctrlChan <- jobStart
  286. time.Sleep(1 * time.Second)
  287. msg := <-managerChan
  288. So(msg.status, ShouldEqual, PreSyncing)
  289. msg = <-managerChan
  290. So(msg.status, ShouldEqual, Syncing)
  291. job.ctrlChan <- jobStart // should be ignored
  292. msg = <-managerChan
  293. So(msg.status, ShouldEqual, Failed)
  294. expectedOutput := fmt.Sprintf("%s\n", provider.WorkingDir())
  295. loggedContent, err := ioutil.ReadFile(provider.LogFile())
  296. So(err, ShouldBeNil)
  297. So(string(loggedContent), ShouldEqual, expectedOutput)
  298. job.ctrlChan <- jobDisable
  299. <-job.disabled
  300. })
  301. })
  302. })
  303. }
  304. func TestConcurrentMirrorJobs(t *testing.T) {
  305. InitLogger(true, true, false)
  306. Convey("Concurrent MirrorJobs should work", t, func(ctx C) {
  307. tmpDir, err := ioutil.TempDir("", "tunasync")
  308. defer os.RemoveAll(tmpDir)
  309. So(err, ShouldBeNil)
  310. const CONCURRENT = 5
  311. var providers [CONCURRENT]*cmdProvider
  312. var jobs [CONCURRENT]*mirrorJob
  313. for i := 0; i < CONCURRENT; i++ {
  314. c := cmdConfig{
  315. name: fmt.Sprintf("job-%d", i),
  316. upstreamURL: "http://mirrors.tuna.moe/",
  317. command: "sleep 2",
  318. workingDir: tmpDir,
  319. logDir: tmpDir,
  320. logFile: "/dev/null",
  321. interval: 10 * time.Second,
  322. }
  323. var err error
  324. providers[i], err = newCmdProvider(c)
  325. So(err, ShouldBeNil)
  326. jobs[i] = newMirrorJob(providers[i])
  327. }
  328. managerChan := make(chan jobMessage, 10)
  329. semaphore := make(chan empty, CONCURRENT-2)
  330. countingJobs := func(managerChan chan jobMessage, totalJobs, concurrentCheck int) (peakConcurrent, counterFailed int) {
  331. counterEnded := 0
  332. counterRunning := 0
  333. peakConcurrent = 0
  334. counterFailed = 0
  335. for counterEnded < totalJobs {
  336. msg := <-managerChan
  337. switch msg.status {
  338. case PreSyncing:
  339. counterRunning++
  340. case Syncing:
  341. case Failed:
  342. counterFailed++
  343. fallthrough
  344. case Success:
  345. counterEnded++
  346. counterRunning--
  347. default:
  348. So(0, ShouldEqual, 1)
  349. }
  350. // Test if semaphore works
  351. So(counterRunning, ShouldBeLessThanOrEqualTo, concurrentCheck)
  352. if counterRunning > peakConcurrent {
  353. peakConcurrent = counterRunning
  354. }
  355. }
  356. // select {
  357. // case msg := <-managerChan:
  358. // logger.Errorf("extra message received: %v", msg)
  359. // So(0, ShouldEqual, 1)
  360. // case <-time.After(2 * time.Second):
  361. // }
  362. return
  363. }
  364. Convey("When we run them all", func(ctx C) {
  365. for _, job := range jobs {
  366. go job.Run(managerChan, semaphore)
  367. job.ctrlChan <- jobStart
  368. }
  369. peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
  370. So(peakConcurrent, ShouldEqual, CONCURRENT-2)
  371. So(counterFailed, ShouldEqual, 0)
  372. for _, job := range jobs {
  373. job.ctrlChan <- jobDisable
  374. <-job.disabled
  375. }
  376. })
  377. Convey("If we cancel one job", func(ctx C) {
  378. for _, job := range jobs {
  379. go job.Run(managerChan, semaphore)
  380. job.ctrlChan <- jobRestart
  381. time.Sleep(200 * time.Millisecond)
  382. }
  383. // Cancel the one waiting for semaphore
  384. jobs[len(jobs)-1].ctrlChan <- jobStop
  385. peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT-1, CONCURRENT-2)
  386. So(peakConcurrent, ShouldEqual, CONCURRENT-2)
  387. So(counterFailed, ShouldEqual, 0)
  388. for _, job := range jobs {
  389. job.ctrlChan <- jobDisable
  390. <-job.disabled
  391. }
  392. })
  393. Convey("If we override the concurrent limit", func(ctx C) {
  394. for _, job := range jobs {
  395. go job.Run(managerChan, semaphore)
  396. job.ctrlChan <- jobStart
  397. time.Sleep(200 * time.Millisecond)
  398. }
  399. jobs[len(jobs)-1].ctrlChan <- jobForceStart
  400. jobs[len(jobs)-2].ctrlChan <- jobForceStart
  401. peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT)
  402. So(peakConcurrent, ShouldEqual, CONCURRENT)
  403. So(counterFailed, ShouldEqual, 0)
  404. time.Sleep(1 * time.Second)
  405. // fmt.Println("Restart them")
  406. for _, job := range jobs {
  407. job.ctrlChan <- jobStart
  408. }
  409. peakConcurrent, counterFailed = countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
  410. So(peakConcurrent, ShouldEqual, CONCURRENT-2)
  411. So(counterFailed, ShouldEqual, 0)
  412. for _, job := range jobs {
  413. job.ctrlChan <- jobDisable
  414. <-job.disabled
  415. }
  416. })
  417. })
  418. }