2
0

job_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. package worker
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "testing"
  7. "time"
  8. . "github.com/smartystreets/goconvey/convey"
  9. . "github.com/tuna/tunasync/internal"
  10. )
  11. func TestMirrorJob(t *testing.T) {
  12. InitLogger(true, true, false)
  13. Convey("MirrorJob should work", t, func(ctx C) {
  14. tmpDir, err := os.MkdirTemp("", "tunasync")
  15. defer os.RemoveAll(tmpDir)
  16. So(err, ShouldBeNil)
  17. scriptFile := filepath.Join(tmpDir, "cmd.sh")
  18. tmpFile := filepath.Join(tmpDir, "log_file")
  19. c := cmdConfig{
  20. name: "tuna-cmd-jobtest",
  21. upstreamURL: "http://mirrors.tuna.moe/",
  22. command: "bash " + scriptFile,
  23. workingDir: tmpDir,
  24. logDir: tmpDir,
  25. logFile: tmpFile,
  26. interval: 1 * time.Second,
  27. timeout: 7 * time.Second,
  28. }
  29. provider, err := newCmdProvider(c)
  30. So(err, ShouldBeNil)
  31. So(provider.Name(), ShouldEqual, c.name)
  32. So(provider.WorkingDir(), ShouldEqual, c.workingDir)
  33. So(provider.LogDir(), ShouldEqual, c.logDir)
  34. So(provider.LogFile(), ShouldEqual, c.logFile)
  35. So(provider.Interval(), ShouldEqual, c.interval)
  36. So(provider.Timeout(), ShouldEqual, c.timeout)
  37. Convey("For a normal mirror job", func(ctx C) {
  38. scriptContent := `#!/bin/bash
  39. echo $TUNASYNC_WORKING_DIR
  40. echo $TUNASYNC_MIRROR_NAME
  41. echo $TUNASYNC_UPSTREAM_URL
  42. echo $TUNASYNC_LOG_FILE
  43. `
  44. expectedOutput := fmt.Sprintf(
  45. "%s\n%s\n%s\n%s\n",
  46. provider.WorkingDir(),
  47. provider.Name(),
  48. provider.upstreamURL,
  49. provider.LogFile(),
  50. )
  51. err = os.WriteFile(scriptFile, []byte(scriptContent), 0755)
  52. So(err, ShouldBeNil)
  53. readedScriptContent, err := os.ReadFile(scriptFile)
  54. So(err, ShouldBeNil)
  55. So(readedScriptContent, ShouldResemble, []byte(scriptContent))
  56. Convey("If we let it run several times", func(ctx C) {
  57. managerChan := make(chan jobMessage, 10)
  58. semaphore := make(chan empty, 1)
  59. job := newMirrorJob(provider)
  60. go job.Run(managerChan, semaphore)
  61. // job should not start if we don't start it
  62. select {
  63. case <-managerChan:
  64. So(0, ShouldEqual, 1) // made this fail
  65. case <-time.After(1 * time.Second):
  66. So(0, ShouldEqual, 0)
  67. }
  68. job.ctrlChan <- jobStart
  69. for i := 0; i < 2; i++ {
  70. msg := <-managerChan
  71. So(msg.status, ShouldEqual, PreSyncing)
  72. msg = <-managerChan
  73. So(msg.status, ShouldEqual, Syncing)
  74. msg = <-managerChan
  75. So(msg.status, ShouldEqual, Success)
  76. loggedContent, err := os.ReadFile(provider.LogFile())
  77. So(err, ShouldBeNil)
  78. So(string(loggedContent), ShouldEqual, expectedOutput)
  79. job.ctrlChan <- jobStart
  80. }
  81. select {
  82. case msg := <-managerChan:
  83. So(msg.status, ShouldEqual, PreSyncing)
  84. msg = <-managerChan
  85. So(msg.status, ShouldEqual, Syncing)
  86. msg = <-managerChan
  87. So(msg.status, ShouldEqual, Success)
  88. case <-time.After(2 * time.Second):
  89. So(0, ShouldEqual, 1)
  90. }
  91. job.ctrlChan <- jobDisable
  92. select {
  93. case <-managerChan:
  94. So(0, ShouldEqual, 1) // made this fail
  95. case <-job.disabled:
  96. So(0, ShouldEqual, 0)
  97. }
  98. })
  99. })
  100. Convey("When running long jobs with post-fail hook", func(ctx C) {
  101. scriptContent := `#!/bin/bash
  102. echo '++++++'
  103. echo $TUNASYNC_WORKING_DIR
  104. echo $0 sleeping
  105. sleep 3
  106. echo $TUNASYNC_WORKING_DIR
  107. echo '------'
  108. `
  109. err = os.WriteFile(scriptFile, []byte(scriptContent), 0755)
  110. So(err, ShouldBeNil)
  111. hookScriptFile := filepath.Join(tmpDir, "hook.sh")
  112. err = os.WriteFile(hookScriptFile, []byte(scriptContent), 0755)
  113. So(err, ShouldBeNil)
  114. h, err := newExecPostHook(provider, execOnFailure, hookScriptFile)
  115. So(err, ShouldBeNil)
  116. provider.AddHook(h)
  117. managerChan := make(chan jobMessage, 10)
  118. semaphore := make(chan empty, 1)
  119. job := newMirrorJob(provider)
  120. Convey("If we kill it", func(ctx C) {
  121. go job.Run(managerChan, semaphore)
  122. job.ctrlChan <- jobStart
  123. time.Sleep(1 * time.Second)
  124. msg := <-managerChan
  125. So(msg.status, ShouldEqual, PreSyncing)
  126. msg = <-managerChan
  127. So(msg.status, ShouldEqual, Syncing)
  128. job.ctrlChan <- jobStop
  129. msg = <-managerChan
  130. So(msg.status, ShouldEqual, Failed)
  131. job.ctrlChan <- jobDisable
  132. <-job.disabled
  133. })
  134. Convey("If we kill it then start it", func(ctx C) {
  135. go job.Run(managerChan, semaphore)
  136. job.ctrlChan <- jobStart
  137. time.Sleep(1 * time.Second)
  138. msg := <-managerChan
  139. So(msg.status, ShouldEqual, PreSyncing)
  140. msg = <-managerChan
  141. So(msg.status, ShouldEqual, Syncing)
  142. job.ctrlChan <- jobStop
  143. time.Sleep(2 * time.Second)
  144. logger.Debugf("Now starting...\n")
  145. job.ctrlChan <- jobStart
  146. msg = <-managerChan
  147. So(msg.status, ShouldEqual, Failed)
  148. job.ctrlChan <- jobDisable
  149. <-job.disabled
  150. })
  151. })
  152. Convey("When running long jobs", func(ctx C) {
  153. scriptContent := `#!/bin/bash
  154. echo $TUNASYNC_WORKING_DIR
  155. sleep 5
  156. echo $TUNASYNC_WORKING_DIR
  157. `
  158. err = os.WriteFile(scriptFile, []byte(scriptContent), 0755)
  159. So(err, ShouldBeNil)
  160. managerChan := make(chan jobMessage, 10)
  161. semaphore := make(chan empty, 1)
  162. job := newMirrorJob(provider)
  163. Convey("If we kill it", func(ctx C) {
  164. go job.Run(managerChan, semaphore)
  165. job.ctrlChan <- jobStart
  166. time.Sleep(1 * time.Second)
  167. msg := <-managerChan
  168. So(msg.status, ShouldEqual, PreSyncing)
  169. msg = <-managerChan
  170. So(msg.status, ShouldEqual, Syncing)
  171. job.ctrlChan <- jobStart // should be ignored
  172. job.ctrlChan <- jobStop
  173. msg = <-managerChan
  174. So(msg.status, ShouldEqual, Failed)
  175. expectedOutput := fmt.Sprintf("%s\n", provider.WorkingDir())
  176. loggedContent, err := os.ReadFile(provider.LogFile())
  177. So(err, ShouldBeNil)
  178. So(string(loggedContent), ShouldEqual, expectedOutput)
  179. job.ctrlChan <- jobDisable
  180. <-job.disabled
  181. })
  182. Convey("If we don't kill it", func(ctx C) {
  183. go job.Run(managerChan, semaphore)
  184. job.ctrlChan <- jobStart
  185. msg := <-managerChan
  186. So(msg.status, ShouldEqual, PreSyncing)
  187. msg = <-managerChan
  188. So(msg.status, ShouldEqual, Syncing)
  189. msg = <-managerChan
  190. So(msg.status, ShouldEqual, Success)
  191. expectedOutput := fmt.Sprintf(
  192. "%s\n%s\n",
  193. provider.WorkingDir(), provider.WorkingDir(),
  194. )
  195. loggedContent, err := os.ReadFile(provider.LogFile())
  196. So(err, ShouldBeNil)
  197. So(string(loggedContent), ShouldEqual, expectedOutput)
  198. job.ctrlChan <- jobDisable
  199. <-job.disabled
  200. })
  201. Convey("If we restart it", func(ctx C) {
  202. go job.Run(managerChan, semaphore)
  203. job.ctrlChan <- jobStart
  204. msg := <-managerChan
  205. So(msg.status, ShouldEqual, PreSyncing)
  206. msg = <-managerChan
  207. So(msg.status, ShouldEqual, Syncing)
  208. job.ctrlChan <- jobRestart
  209. msg = <-managerChan
  210. So(msg.status, ShouldEqual, Failed)
  211. So(msg.msg, ShouldEqual, "killed by manager")
  212. msg = <-managerChan
  213. So(msg.status, ShouldEqual, PreSyncing)
  214. msg = <-managerChan
  215. So(msg.status, ShouldEqual, Syncing)
  216. msg = <-managerChan
  217. So(msg.status, ShouldEqual, Success)
  218. expectedOutput := fmt.Sprintf(
  219. "%s\n%s\n",
  220. provider.WorkingDir(), provider.WorkingDir(),
  221. )
  222. loggedContent, err := os.ReadFile(provider.LogFile())
  223. So(err, ShouldBeNil)
  224. So(string(loggedContent), ShouldEqual, expectedOutput)
  225. job.ctrlChan <- jobDisable
  226. <-job.disabled
  227. })
  228. Convey("If we disable it", func(ctx C) {
  229. go job.Run(managerChan, semaphore)
  230. job.ctrlChan <- jobStart
  231. msg := <-managerChan
  232. So(msg.status, ShouldEqual, PreSyncing)
  233. msg = <-managerChan
  234. So(msg.status, ShouldEqual, Syncing)
  235. job.ctrlChan <- jobDisable
  236. msg = <-managerChan
  237. So(msg.status, ShouldEqual, Failed)
  238. So(msg.msg, ShouldEqual, "killed by manager")
  239. <-job.disabled
  240. })
  241. Convey("If we stop it twice, than start it", func(ctx C) {
  242. go job.Run(managerChan, semaphore)
  243. job.ctrlChan <- jobStart
  244. msg := <-managerChan
  245. So(msg.status, ShouldEqual, PreSyncing)
  246. msg = <-managerChan
  247. So(msg.status, ShouldEqual, Syncing)
  248. job.ctrlChan <- jobStop
  249. msg = <-managerChan
  250. So(msg.status, ShouldEqual, Failed)
  251. So(msg.msg, ShouldEqual, "killed by manager")
  252. job.ctrlChan <- jobStop // should be ignored
  253. job.ctrlChan <- jobStart
  254. msg = <-managerChan
  255. So(msg.status, ShouldEqual, PreSyncing)
  256. msg = <-managerChan
  257. So(msg.status, ShouldEqual, Syncing)
  258. msg = <-managerChan
  259. So(msg.status, ShouldEqual, Success)
  260. expectedOutput := fmt.Sprintf(
  261. "%s\n%s\n",
  262. provider.WorkingDir(), provider.WorkingDir(),
  263. )
  264. loggedContent, err := os.ReadFile(provider.LogFile())
  265. So(err, ShouldBeNil)
  266. So(string(loggedContent), ShouldEqual, expectedOutput)
  267. job.ctrlChan <- jobDisable
  268. <-job.disabled
  269. })
  270. })
  271. Convey("When a job timed out", func(ctx C) {
  272. scriptContent := `#!/bin/bash
  273. echo $TUNASYNC_WORKING_DIR
  274. sleep 10
  275. echo $TUNASYNC_WORKING_DIR
  276. `
  277. err = os.WriteFile(scriptFile, []byte(scriptContent), 0755)
  278. So(err, ShouldBeNil)
  279. managerChan := make(chan jobMessage, 10)
  280. semaphore := make(chan empty, 1)
  281. job := newMirrorJob(provider)
  282. Convey("It should be automatically terminated", func(ctx C) {
  283. go job.Run(managerChan, semaphore)
  284. job.ctrlChan <- jobStart
  285. time.Sleep(1 * time.Second)
  286. msg := <-managerChan
  287. So(msg.status, ShouldEqual, PreSyncing)
  288. msg = <-managerChan
  289. So(msg.status, ShouldEqual, Syncing)
  290. job.ctrlChan <- jobStart // should be ignored
  291. msg = <-managerChan
  292. So(msg.status, ShouldEqual, Failed)
  293. expectedOutput := fmt.Sprintf("%s\n", provider.WorkingDir())
  294. loggedContent, err := os.ReadFile(provider.LogFile())
  295. So(err, ShouldBeNil)
  296. So(string(loggedContent), ShouldEqual, expectedOutput)
  297. job.ctrlChan <- jobDisable
  298. <-job.disabled
  299. })
  300. Convey("It should be retried", func(ctx C) {
  301. go job.Run(managerChan, semaphore)
  302. job.ctrlChan <- jobStart
  303. time.Sleep(1 * time.Second)
  304. msg := <-managerChan
  305. So(msg.status, ShouldEqual, PreSyncing)
  306. for i := 0; i < defaultMaxRetry; i++ {
  307. msg = <-managerChan
  308. So(msg.status, ShouldEqual, Syncing)
  309. job.ctrlChan <- jobStart // should be ignored
  310. msg = <-managerChan
  311. So(msg.status, ShouldEqual, Failed)
  312. So(msg.msg, ShouldContainSubstring, "timeout after")
  313. // re-schedule after last try
  314. So(msg.schedule, ShouldEqual, i == defaultMaxRetry-1)
  315. }
  316. job.ctrlChan <- jobDisable
  317. <-job.disabled
  318. })
  319. })
  320. })
  321. }
  322. func TestConcurrentMirrorJobs(t *testing.T) {
  323. InitLogger(true, true, false)
  324. Convey("Concurrent MirrorJobs should work", t, func(ctx C) {
  325. tmpDir, err := os.MkdirTemp("", "tunasync")
  326. defer os.RemoveAll(tmpDir)
  327. So(err, ShouldBeNil)
  328. const CONCURRENT = 5
  329. var providers [CONCURRENT]*cmdProvider
  330. var jobs [CONCURRENT]*mirrorJob
  331. for i := 0; i < CONCURRENT; i++ {
  332. c := cmdConfig{
  333. name: fmt.Sprintf("job-%d", i),
  334. upstreamURL: "http://mirrors.tuna.moe/",
  335. command: "sleep 2",
  336. workingDir: tmpDir,
  337. logDir: tmpDir,
  338. logFile: "/dev/null",
  339. interval: 10 * time.Second,
  340. }
  341. var err error
  342. providers[i], err = newCmdProvider(c)
  343. So(err, ShouldBeNil)
  344. jobs[i] = newMirrorJob(providers[i])
  345. }
  346. managerChan := make(chan jobMessage, 10)
  347. semaphore := make(chan empty, CONCURRENT-2)
  348. countingJobs := func(managerChan chan jobMessage, totalJobs, concurrentCheck int) (peakConcurrent, counterFailed int) {
  349. counterEnded := 0
  350. counterRunning := 0
  351. peakConcurrent = 0
  352. counterFailed = 0
  353. for counterEnded < totalJobs {
  354. msg := <-managerChan
  355. switch msg.status {
  356. case PreSyncing:
  357. counterRunning++
  358. case Syncing:
  359. case Failed:
  360. counterFailed++
  361. fallthrough
  362. case Success:
  363. counterEnded++
  364. counterRunning--
  365. default:
  366. So(0, ShouldEqual, 1)
  367. }
  368. // Test if semaphore works
  369. So(counterRunning, ShouldBeLessThanOrEqualTo, concurrentCheck)
  370. if counterRunning > peakConcurrent {
  371. peakConcurrent = counterRunning
  372. }
  373. }
  374. // select {
  375. // case msg := <-managerChan:
  376. // logger.Errorf("extra message received: %v", msg)
  377. // So(0, ShouldEqual, 1)
  378. // case <-time.After(2 * time.Second):
  379. // }
  380. return
  381. }
  382. Convey("When we run them all", func(ctx C) {
  383. for _, job := range jobs {
  384. go job.Run(managerChan, semaphore)
  385. job.ctrlChan <- jobStart
  386. }
  387. peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
  388. So(peakConcurrent, ShouldEqual, CONCURRENT-2)
  389. So(counterFailed, ShouldEqual, 0)
  390. for _, job := range jobs {
  391. job.ctrlChan <- jobDisable
  392. <-job.disabled
  393. }
  394. })
  395. Convey("If we cancel one job", func(ctx C) {
  396. for _, job := range jobs {
  397. go job.Run(managerChan, semaphore)
  398. job.ctrlChan <- jobRestart
  399. time.Sleep(200 * time.Millisecond)
  400. }
  401. // Cancel the one waiting for semaphore
  402. jobs[len(jobs)-1].ctrlChan <- jobStop
  403. peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT-1, CONCURRENT-2)
  404. So(peakConcurrent, ShouldEqual, CONCURRENT-2)
  405. So(counterFailed, ShouldEqual, 0)
  406. for _, job := range jobs {
  407. job.ctrlChan <- jobDisable
  408. <-job.disabled
  409. }
  410. })
  411. Convey("If we override the concurrent limit", func(ctx C) {
  412. for _, job := range jobs {
  413. go job.Run(managerChan, semaphore)
  414. job.ctrlChan <- jobStart
  415. time.Sleep(200 * time.Millisecond)
  416. }
  417. jobs[len(jobs)-1].ctrlChan <- jobForceStart
  418. jobs[len(jobs)-2].ctrlChan <- jobForceStart
  419. peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT)
  420. So(peakConcurrent, ShouldEqual, CONCURRENT)
  421. So(counterFailed, ShouldEqual, 0)
  422. time.Sleep(1 * time.Second)
  423. // fmt.Println("Restart them")
  424. for _, job := range jobs {
  425. job.ctrlChan <- jobStart
  426. }
  427. peakConcurrent, counterFailed = countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
  428. So(peakConcurrent, ShouldEqual, CONCURRENT-2)
  429. So(counterFailed, ShouldEqual, 0)
  430. for _, job := range jobs {
  431. job.ctrlChan <- jobDisable
  432. <-job.disabled
  433. }
  434. })
  435. })
  436. }