cgroup.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package worker
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "os"
  7. "os/exec"
  8. "path/filepath"
  9. "strconv"
  10. "syscall"
  11. "time"
  12. "golang.org/x/sys/unix"
  13. "github.com/codeskyblue/go-sh"
  14. "github.com/moby/moby/pkg/reexec"
  15. )
  16. type cgroupHook struct {
  17. emptyHook
  18. basePath string
  19. baseGroup string
  20. created bool
  21. subsystem string
  22. memLimit MemBytes
  23. }
  24. func init () {
  25. reexec.Register("tunasync-exec", waitExec)
  26. }
  27. func waitExec () {
  28. binary, lookErr := exec.LookPath(os.Args[1])
  29. if lookErr != nil {
  30. panic(lookErr)
  31. }
  32. pipe := os.NewFile(3, "pipe")
  33. if pipe != nil {
  34. for {
  35. tmpBytes := make([]byte, 1)
  36. nRead, err := pipe.Read(tmpBytes)
  37. if err != nil {
  38. break
  39. }
  40. if nRead == 0 {
  41. break
  42. }
  43. }
  44. err := pipe.Close()
  45. if err != nil {
  46. }
  47. }
  48. args := os.Args[1:]
  49. env := os.Environ()
  50. execErr := syscall.Exec(binary, args, env)
  51. if execErr != nil {
  52. panic(execErr)
  53. }
  54. panic("Exec failed.")
  55. }
  56. func newCgroupHook(p mirrorProvider, cfg cgroupConfig, memLimit MemBytes) *cgroupHook {
  57. var (
  58. basePath = cfg.BasePath
  59. baseGroup = cfg.Group
  60. subsystem = cfg.Subsystem
  61. )
  62. if basePath == "" {
  63. basePath = "/sys/fs/cgroup"
  64. }
  65. if baseGroup == "" {
  66. baseGroup = "tunasync"
  67. }
  68. if subsystem == "" {
  69. subsystem = "cpu"
  70. }
  71. return &cgroupHook{
  72. emptyHook: emptyHook{
  73. provider: p,
  74. },
  75. basePath: basePath,
  76. baseGroup: baseGroup,
  77. subsystem: subsystem,
  78. }
  79. }
  80. func (c *cgroupHook) preExec() error {
  81. c.created = true
  82. if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil {
  83. return err
  84. }
  85. if c.subsystem != "memory" {
  86. return nil
  87. }
  88. if c.memLimit != 0 {
  89. gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name())
  90. return sh.Command(
  91. "cgset", "-r",
  92. fmt.Sprintf("memory.limit_in_bytes=%d", c.memLimit.Value()),
  93. gname,
  94. ).Run()
  95. }
  96. return nil
  97. }
  98. func (c *cgroupHook) postExec() error {
  99. err := c.killAll()
  100. if err != nil {
  101. logger.Errorf("Error killing tasks: %s", err.Error())
  102. }
  103. c.created = false
  104. return sh.Command("cgdelete", c.Cgroup()).Run()
  105. }
  106. func (c *cgroupHook) Cgroup() string {
  107. name := c.provider.Name()
  108. return fmt.Sprintf("%s:%s/%s", c.subsystem, c.baseGroup, name)
  109. }
  110. func (c *cgroupHook) killAll() error {
  111. if !c.created {
  112. return nil
  113. }
  114. name := c.provider.Name()
  115. readTaskList := func() ([]int, error) {
  116. taskList := []int{}
  117. taskFile, err := os.Open(filepath.Join(c.basePath, c.subsystem, c.baseGroup, name, "tasks"))
  118. if err != nil {
  119. return taskList, err
  120. }
  121. defer taskFile.Close()
  122. scanner := bufio.NewScanner(taskFile)
  123. for scanner.Scan() {
  124. pid, err := strconv.Atoi(scanner.Text())
  125. if err != nil {
  126. return taskList, err
  127. }
  128. taskList = append(taskList, pid)
  129. }
  130. return taskList, nil
  131. }
  132. for i := 0; i < 4; i++ {
  133. if i == 3 {
  134. return errors.New("Unable to kill all child tasks")
  135. }
  136. taskList, err := readTaskList()
  137. if err != nil {
  138. return err
  139. }
  140. if len(taskList) == 0 {
  141. return nil
  142. }
  143. for _, pid := range taskList {
  144. // TODO: deal with defunct processes
  145. logger.Debugf("Killing process: %d", pid)
  146. unix.Kill(pid, syscall.SIGKILL)
  147. }
  148. // sleep 10ms for the first round, and 1.01s, 2.01s, 3.01s for the rest
  149. time.Sleep(time.Duration(i)*time.Second + 10*time.Millisecond)
  150. }
  151. return nil
  152. }