cgroup.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package worker
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "os"
  7. "os/exec"
  8. "path/filepath"
  9. "strconv"
  10. "syscall"
  11. "time"
  12. "golang.org/x/sys/unix"
  13. "github.com/codeskyblue/go-sh"
  14. "github.com/moby/moby/pkg/reexec"
  15. )
  16. type cgroupHook struct {
  17. emptyHook
  18. basePath string
  19. baseGroup string
  20. created bool
  21. subsystem string
  22. memLimit MemBytes
  23. }
  24. func init () {
  25. reexec.Register("tunasync-exec", waitExec)
  26. }
  27. func waitExec () {
  28. binary, lookErr := exec.LookPath(os.Args[1])
  29. if lookErr != nil {
  30. panic(lookErr)
  31. }
  32. pipe := os.NewFile(3, "pipe")
  33. if pipe != nil {
  34. for {
  35. tmpBytes := make([]byte, 1)
  36. nRead, err := pipe.Read(tmpBytes)
  37. if err != nil {
  38. break
  39. }
  40. if nRead == 0 {
  41. break
  42. }
  43. }
  44. err := pipe.Close()
  45. if err != nil {
  46. }
  47. }
  48. args := os.Args[1:]
  49. env := os.Environ()
  50. execErr := syscall.Exec(binary, args, env)
  51. if execErr != nil {
  52. panic(execErr)
  53. }
  54. panic("Exec failed.")
  55. }
  56. func newCgroupHook(p mirrorProvider, basePath, baseGroup, subsystem string, memLimit MemBytes) *cgroupHook {
  57. if basePath == "" {
  58. basePath = "/sys/fs/cgroup"
  59. }
  60. if baseGroup == "" {
  61. baseGroup = "tunasync"
  62. }
  63. if subsystem == "" {
  64. subsystem = "cpu"
  65. }
  66. return &cgroupHook{
  67. emptyHook: emptyHook{
  68. provider: p,
  69. },
  70. basePath: basePath,
  71. baseGroup: baseGroup,
  72. subsystem: subsystem,
  73. }
  74. }
  75. func (c *cgroupHook) preExec() error {
  76. c.created = true
  77. if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil {
  78. return err
  79. }
  80. if c.subsystem != "memory" {
  81. return nil
  82. }
  83. if c.memLimit != 0 {
  84. gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name())
  85. return sh.Command(
  86. "cgset", "-r",
  87. fmt.Sprintf("memory.limit_in_bytes=%d", c.memLimit.Value()),
  88. gname,
  89. ).Run()
  90. }
  91. return nil
  92. }
  93. func (c *cgroupHook) postExec() error {
  94. err := c.killAll()
  95. if err != nil {
  96. logger.Errorf("Error killing tasks: %s", err.Error())
  97. }
  98. c.created = false
  99. return sh.Command("cgdelete", c.Cgroup()).Run()
  100. }
  101. func (c *cgroupHook) Cgroup() string {
  102. name := c.provider.Name()
  103. return fmt.Sprintf("%s:%s/%s", c.subsystem, c.baseGroup, name)
  104. }
  105. func (c *cgroupHook) killAll() error {
  106. if !c.created {
  107. return nil
  108. }
  109. name := c.provider.Name()
  110. readTaskList := func() ([]int, error) {
  111. taskList := []int{}
  112. taskFile, err := os.Open(filepath.Join(c.basePath, c.subsystem, c.baseGroup, name, "tasks"))
  113. if err != nil {
  114. return taskList, err
  115. }
  116. defer taskFile.Close()
  117. scanner := bufio.NewScanner(taskFile)
  118. for scanner.Scan() {
  119. pid, err := strconv.Atoi(scanner.Text())
  120. if err != nil {
  121. return taskList, err
  122. }
  123. taskList = append(taskList, pid)
  124. }
  125. return taskList, nil
  126. }
  127. for i := 0; i < 4; i++ {
  128. if i == 3 {
  129. return errors.New("Unable to kill all child tasks")
  130. }
  131. taskList, err := readTaskList()
  132. if err != nil {
  133. return err
  134. }
  135. if len(taskList) == 0 {
  136. return nil
  137. }
  138. for _, pid := range taskList {
  139. // TODO: deal with defunct processes
  140. logger.Debugf("Killing process: %d", pid)
  141. unix.Kill(pid, syscall.SIGKILL)
  142. }
  143. // sleep 10ms for the first round, and 1.01s, 2.01s, 3.01s for the rest
  144. time.Sleep(time.Duration(i)*time.Second + 10*time.Millisecond)
  145. }
  146. return nil
  147. }