runner.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. "os"
  6. "os/exec"
  7. "strings"
  8. "sync"
  9. "syscall"
  10. "time"
  11. "github.com/codeskyblue/go-sh"
  12. "golang.org/x/sys/unix"
  13. )
  14. // runner is to run os commands giving command line, env and log file
  15. // it's an alternative to python-sh or go-sh
  16. var errProcessNotStarted = errors.New("Process Not Started")
  17. type cmdJob struct {
  18. sync.Mutex
  19. cmd *exec.Cmd
  20. workingDir string
  21. env map[string]string
  22. logFile *os.File
  23. finished chan empty
  24. provider mirrorProvider
  25. retErr error
  26. }
  27. func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob {
  28. var cmd *exec.Cmd
  29. if d := provider.Docker(); d != nil {
  30. c := "docker"
  31. args := []string{
  32. "run", "--rm",
  33. "-a", "STDOUT", "-a", "STDERR",
  34. "--name", d.Name(),
  35. "-w", workingDir,
  36. }
  37. // specify user
  38. args = append(
  39. args, "-u",
  40. fmt.Sprintf("%d:%d", os.Getuid(), os.Getgid()),
  41. )
  42. // add volumes
  43. for _, vol := range d.Volumes() {
  44. logger.Debugf("volume: %s", vol)
  45. args = append(args, "-v", vol)
  46. }
  47. // set env
  48. for k, v := range env {
  49. kv := fmt.Sprintf("%s=%s", k, v)
  50. args = append(args, "-e", kv)
  51. }
  52. // set memlimit
  53. if d.memoryLimit != 0 {
  54. args = append(args, "-m", fmt.Sprint(d.memoryLimit.Value()))
  55. }
  56. // apply options
  57. args = append(args, d.options...)
  58. // apply image and command
  59. args = append(args, d.image)
  60. // apply command
  61. args = append(args, cmdAndArgs...)
  62. cmd = exec.Command(c, args...)
  63. } else if provider.Cgroup() != nil {
  64. //c := "cgexec"
  65. //args := []string{"-g", provider.Cgroup().Cgroup()}
  66. //args = append(args, cmdAndArgs...)
  67. //cmd = exec.Command(c, args...)
  68. cmd = exec.Command(cmdAndArgs[0], cmdAndArgs[1:]...)
  69. } else {
  70. if len(cmdAndArgs) == 1 {
  71. cmd = exec.Command(cmdAndArgs[0])
  72. } else if len(cmdAndArgs) > 1 {
  73. c := cmdAndArgs[0]
  74. args := cmdAndArgs[1:]
  75. cmd = exec.Command(c, args...)
  76. } else if len(cmdAndArgs) == 0 {
  77. panic("Command length should be at least 1!")
  78. }
  79. }
  80. if provider.Docker() == nil {
  81. logger.Debugf("Executing command %s at %s", cmdAndArgs[0], workingDir)
  82. if _, err := os.Stat(workingDir); os.IsNotExist(err) {
  83. logger.Debugf("Making dir %s", workingDir)
  84. if err = os.MkdirAll(workingDir, 0755); err != nil {
  85. logger.Errorf("Error making dir %s: %s", workingDir, err.Error())
  86. }
  87. }
  88. cmd.Dir = workingDir
  89. cmd.Env = newEnviron(env, true)
  90. }
  91. return &cmdJob{
  92. cmd: cmd,
  93. workingDir: workingDir,
  94. env: env,
  95. provider: provider,
  96. }
  97. }
  98. func (c *cmdJob) Start() error {
  99. logger.Debugf("Command start: %v", c.cmd.Args)
  100. c.finished = make(chan empty, 1)
  101. return c.cmd.Start()
  102. }
  103. func (c *cmdJob) Wait() error {
  104. c.Lock()
  105. defer c.Unlock()
  106. select {
  107. case <-c.finished:
  108. return c.retErr
  109. default:
  110. err := c.cmd.Wait()
  111. c.retErr = err
  112. close(c.finished)
  113. return err
  114. }
  115. }
  116. func (c *cmdJob) SetLogFile(logFile *os.File) {
  117. c.cmd.Stdout = logFile
  118. c.cmd.Stderr = logFile
  119. }
  120. func (c *cmdJob) Terminate() error {
  121. if c.cmd == nil || c.cmd.Process == nil {
  122. return errProcessNotStarted
  123. }
  124. if d := c.provider.Docker(); d != nil {
  125. sh.Command(
  126. "docker", "stop", "-t", "2", d.Name(),
  127. ).Run()
  128. return nil
  129. }
  130. err := unix.Kill(c.cmd.Process.Pid, syscall.SIGTERM)
  131. if err != nil {
  132. return err
  133. }
  134. select {
  135. case <-time.After(2 * time.Second):
  136. unix.Kill(c.cmd.Process.Pid, syscall.SIGKILL)
  137. logger.Warningf("SIGTERM failed to kill the job in 2s. SIGKILL sent")
  138. case <-c.finished:
  139. }
  140. return nil
  141. }
  142. // Copied from go-sh
  143. func newEnviron(env map[string]string, inherit bool) []string { //map[string]string {
  144. environ := make([]string, 0, len(env))
  145. if inherit {
  146. for _, line := range os.Environ() {
  147. // if os environment and env collapses,
  148. // omit the os one
  149. k := strings.Split(line, "=")[0]
  150. if _, ok := env[k]; ok {
  151. continue
  152. }
  153. environ = append(environ, line)
  154. }
  155. }
  156. for k, v := range env {
  157. environ = append(environ, k+"="+v)
  158. }
  159. return environ
  160. }