runner.go 4.9 KB


  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. "os"
  6. "os/exec"
  7. "strings"
  8. "sync"
  9. "syscall"
  10. "time"
  11. "github.com/codeskyblue/go-sh"
  12. "golang.org/x/sys/unix"
  13. "github.com/moby/moby/pkg/reexec"
  14. cgv1 "github.com/containerd/cgroups"
  15. )
  16. // runner is to run os commands giving command line, env and log file
  17. // it's an alternative to python-sh or go-sh
  18. var errProcessNotStarted = errors.New("Process Not Started")
  19. type cmdJob struct {
  20. sync.Mutex
  21. cmd *exec.Cmd
  22. workingDir string
  23. env map[string]string
  24. logFile *os.File
  25. finished chan empty
  26. provider mirrorProvider
  27. retErr error
  28. }
  29. func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob {
  30. var cmd *exec.Cmd
  31. if d := provider.Docker(); d != nil {
  32. c := "docker"
  33. args := []string{
  34. "run", "--rm",
  35. "-a", "STDOUT", "-a", "STDERR",
  36. "--name", d.Name(),
  37. "-w", workingDir,
  38. }
  39. // specify user
  40. args = append(
  41. args, "-u",
  42. fmt.Sprintf("%d:%d", os.Getuid(), os.Getgid()),
  43. )
  44. // add volumes
  45. for _, vol := range d.Volumes() {
  46. logger.Debugf("volume: %s", vol)
  47. args = append(args, "-v", vol)
  48. }
  49. // set env
  50. for k, v := range env {
  51. kv := fmt.Sprintf("%s=%s", k, v)
  52. args = append(args, "-e", kv)
  53. }
  54. // set memlimit
  55. if d.memoryLimit != 0 {
  56. args = append(args, "-m", fmt.Sprint(d.memoryLimit.Value()))
  57. }
  58. // apply options
  59. args = append(args, d.options...)
  60. // apply image and command
  61. args = append(args, d.image)
  62. // apply command
  63. args = append(args, cmdAndArgs...)
  64. cmd = exec.Command(c, args...)
  65. } else if provider.Cgroup() != nil {
  66. cmd = reexec.Command(append([]string{"tunasync-exec"}, cmdAndArgs...)...)
  67. } else {
  68. if len(cmdAndArgs) == 1 {
  69. cmd = exec.Command(cmdAndArgs[0])
  70. } else if len(cmdAndArgs) > 1 {
  71. c := cmdAndArgs[0]
  72. args := cmdAndArgs[1:]
  73. cmd = exec.Command(c, args...)
  74. } else if len(cmdAndArgs) == 0 {
  75. panic("Command length should be at least 1!")
  76. }
  77. }
  78. if provider.Docker() == nil {
  79. logger.Debugf("Executing command %s at %s", cmdAndArgs[0], workingDir)
  80. if _, err := os.Stat(workingDir); os.IsNotExist(err) {
  81. logger.Debugf("Making dir %s", workingDir)
  82. if err = os.MkdirAll(workingDir, 0755); err != nil {
  83. logger.Errorf("Error making dir %s: %s", workingDir, err.Error())
  84. }
  85. }
  86. cmd.Dir = workingDir
  87. cmd.Env = newEnviron(env, true)
  88. }
  89. return &cmdJob{
  90. cmd: cmd,
  91. workingDir: workingDir,
  92. env: env,
  93. provider: provider,
  94. }
  95. }
  96. func (c *cmdJob) Start() error {
  97. cg := c.provider.Cgroup()
  98. var (
  99. pipeR *os.File
  100. pipeW *os.File
  101. )
  102. if cg != nil {
  103. logger.Debugf("Preparing cgroup sync pipes for job %s", c.provider.Name())
  104. var err error
  105. pipeR, pipeW, err = os.Pipe();
  106. if err != nil {
  107. return err
  108. }
  109. c.cmd.ExtraFiles = []*os.File{pipeR}
  110. defer pipeR.Close()
  111. defer pipeW.Close()
  112. }
  113. logger.Debugf("Command start: %v", c.cmd.Args)
  114. c.finished = make(chan empty, 1)
  115. if err := c.cmd.Start(); err != nil {
  116. return err
  117. }
  118. if cg != nil {
  119. if err := pipeR.Close(); err != nil {
  120. return err
  121. }
  122. if c.cmd == nil || c.cmd.Process == nil {
  123. return errProcessNotStarted
  124. }
  125. pid := c.cmd.Process.Pid
  126. if cg.cgCfg.isUnified {
  127. if err := cg.cgMgrV2.AddProc(uint64(pid)); err != nil{
  128. if errors.Is(err, syscall.ESRCH) {
  129. logger.Infof("Write pid %d to cgroup failed: process vanished, ignoring")
  130. } else {
  131. return err
  132. }
  133. }
  134. } else {
  135. if err := cg.cgMgrV1.Add(cgv1.Process{Pid: pid}); err != nil{
  136. if errors.Is(err, syscall.ESRCH) {
  137. logger.Infof("Write pid %d to cgroup failed: process vanished, ignoring")
  138. } else {
  139. return err
  140. }
  141. }
  142. }
  143. if _, err := pipeW.WriteString(string(cmdCont)); err != nil {
  144. return err
  145. }
  146. }
  147. return nil
  148. }
  149. func (c *cmdJob) Wait() error {
  150. c.Lock()
  151. defer c.Unlock()
  152. select {
  153. case <-c.finished:
  154. return c.retErr
  155. default:
  156. err := c.cmd.Wait()
  157. c.retErr = err
  158. close(c.finished)
  159. return err
  160. }
  161. }
  162. func (c *cmdJob) SetLogFile(logFile *os.File) {
  163. c.cmd.Stdout = logFile
  164. c.cmd.Stderr = logFile
  165. }
  166. func (c *cmdJob) Terminate() error {
  167. if c.cmd == nil || c.cmd.Process == nil {
  168. return errProcessNotStarted
  169. }
  170. if d := c.provider.Docker(); d != nil {
  171. sh.Command(
  172. "docker", "stop", "-t", "2", d.Name(),
  173. ).Run()
  174. return nil
  175. }
  176. err := unix.Kill(c.cmd.Process.Pid, syscall.SIGTERM)
  177. if err != nil {
  178. return err
  179. }
  180. select {
  181. case <-time.After(2 * time.Second):
  182. unix.Kill(c.cmd.Process.Pid, syscall.SIGKILL)
  183. logger.Warningf("SIGTERM failed to kill the job in 2s. SIGKILL sent")
  184. case <-c.finished:
  185. }
  186. return nil
  187. }
  188. // Copied from go-sh
  189. func newEnviron(env map[string]string, inherit bool) []string { //map[string]string {
  190. environ := make([]string, 0, len(env))
  191. if inherit {
  192. for _, line := range os.Environ() {
  193. // if os environment and env collapses,
  194. // omit the os one
  195. k := strings.Split(line, "=")[0]
  196. if _, ok := env[k]; ok {
  197. continue
  198. }
  199. environ = append(environ, line)
  200. }
  201. }
  202. for k, v := range env {
  203. environ = append(environ, k+"="+v)
  204. }
  205. return environ
  206. }