cgroup.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. package worker
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "os"
  7. "os/exec"
  8. "path/filepath"
  9. "strconv"
  10. "syscall"
  11. "time"
  12. "golang.org/x/sys/unix"
  13. "github.com/codeskyblue/go-sh"
  14. "github.com/moby/moby/pkg/reexec"
  15. cgv1 "github.com/containerd/cgroups"
  16. cgv2 "github.com/containerd/cgroups/v2"
  17. contspecs "github.com/opencontainers/runtime-spec/specs-go"
  18. )
  19. type cgroupHook struct {
  20. emptyHook
  21. basePath string
  22. baseGroup string
  23. created bool
  24. subsystem string
  25. memLimit MemBytes
  26. }
  27. func init () {
  28. reexec.Register("tunasync-exec", waitExec)
  29. }
  30. func waitExec () {
  31. binary, lookErr := exec.LookPath(os.Args[1])
  32. if lookErr != nil {
  33. panic(lookErr)
  34. }
  35. pipe := os.NewFile(3, "pipe")
  36. if pipe != nil {
  37. for {
  38. tmpBytes := make([]byte, 1)
  39. nRead, err := pipe.Read(tmpBytes)
  40. if err != nil {
  41. break
  42. }
  43. if nRead == 0 {
  44. break
  45. }
  46. }
  47. err := pipe.Close()
  48. if err != nil {
  49. }
  50. }
  51. args := os.Args[1:]
  52. env := os.Environ()
  53. execErr := syscall.Exec(binary, args, env)
  54. if execErr != nil {
  55. panic(execErr)
  56. }
  57. panic("Exec failed.")
  58. }
  59. func initCgroup(cfg *cgroupConfig) (error) {
  60. logger.Debugf("Initializing cgroup")
  61. baseGroup := cfg.Group
  62. //subsystem := cfg.Subsystem
  63. // If baseGroup is empty, it implies using the cgroup of the current process
  64. // otherwise, it refers to a absolute group path
  65. if baseGroup != "" {
  66. baseGroup = filepath.Join("/", baseGroup)
  67. }
  68. cfg.isUnified = cgv1.Mode() == cgv1.Unified
  69. if cfg.isUnified {
  70. logger.Debugf("Cgroup V2 detected")
  71. g := baseGroup
  72. if g == "" {
  73. logger.Debugf("Detecting my cgroup path")
  74. var err error
  75. if g, err = cgv2.NestedGroupPath(""); err != nil {
  76. return err
  77. }
  78. }
  79. logger.Infof("Using cgroup path: %s", g)
  80. var err error
  81. if cfg.cgMgrV2, err = cgv2.LoadManager("/sys/fs/cgroup", g); err != nil {
  82. return err
  83. }
  84. if baseGroup == "" {
  85. logger.Debugf("Creating a sub group and move all processes into it")
  86. wkrMgr, err := cfg.cgMgrV2.NewChild("__worker", nil);
  87. if err != nil {
  88. return err
  89. }
  90. for {
  91. logger.Debugf("Reading pids")
  92. procs, err := cfg.cgMgrV2.Procs(false)
  93. if err != nil {
  94. logger.Errorf("Cannot read pids in that group")
  95. return err
  96. }
  97. if len(procs) == 0 {
  98. break
  99. }
  100. for _, p := range(procs) {
  101. if err := wkrMgr.AddProc(p); err != nil{
  102. if errors.Is(err, syscall.ESRCH) {
  103. logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
  104. } else {
  105. return err
  106. }
  107. }
  108. }
  109. }
  110. } else {
  111. logger.Debugf("Trying to create a sub group in that group")
  112. testMgr, err := cfg.cgMgrV2.NewChild("__test", nil);
  113. if err != nil {
  114. logger.Errorf("Cannot create a sub group in the cgroup")
  115. return err
  116. }
  117. if err := testMgr.Delete(); err != nil {
  118. return err
  119. }
  120. procs, err := cfg.cgMgrV2.Procs(false)
  121. if err != nil {
  122. logger.Errorf("Cannot read pids in that group")
  123. return err
  124. }
  125. if len(procs) != 0 {
  126. return fmt.Errorf("There are remaining processes in cgroup %s", baseGroup)
  127. }
  128. }
  129. } else {
  130. logger.Debugf("Cgroup V1 detected")
  131. var pather cgv1.Path
  132. if baseGroup != "" {
  133. pather = cgv1.StaticPath(baseGroup)
  134. } else {
  135. pather = (func(p cgv1.Path) (cgv1.Path){
  136. return func(subsys cgv1.Name) (string, error){
  137. path, err := p(subsys);
  138. if err != nil {
  139. return "", err
  140. }
  141. if path == "/" {
  142. return "", cgv1.ErrControllerNotActive
  143. }
  144. return path, err
  145. }
  146. })(cgv1.NestedPath(""))
  147. }
  148. logger.Infof("Loading cgroup")
  149. var err error
  150. if cfg.cgMgrV1, err = cgv1.Load(cgv1.V1, pather); err != nil {
  151. return err
  152. }
  153. logger.Debugf("Available subsystems:")
  154. for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
  155. p, err := pather(subsys.Name())
  156. if err != nil {
  157. return err
  158. }
  159. logger.Debugf("%s: %s", subsys.Name(), p)
  160. }
  161. if baseGroup == "" {
  162. logger.Debugf("Creating a sub group and move all processes into it")
  163. wkrMgr, err := cfg.cgMgrV1.New("__worker", &contspecs.LinuxResources{});
  164. if err != nil {
  165. return err
  166. }
  167. for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
  168. logger.Debugf("Reading pids for subsystem %s", subsys.Name())
  169. for {
  170. procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false)
  171. if err != nil {
  172. p, err := pather(subsys.Name())
  173. if err != nil {
  174. return err
  175. }
  176. logger.Errorf("Cannot read pids in group %s of subsystem %s", p, subsys.Name())
  177. return err
  178. }
  179. if len(procs) == 0 {
  180. break
  181. }
  182. for _, proc := range(procs) {
  183. if err := wkrMgr.Add(proc); err != nil {
  184. if errors.Is(err, syscall.ESRCH) {
  185. logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
  186. } else {
  187. return err
  188. }
  189. }
  190. }
  191. }
  192. }
  193. } else {
  194. logger.Debugf("Trying to create a sub group in that group")
  195. testMgr, err := cfg.cgMgrV1.New("__test", &contspecs.LinuxResources{});
  196. if err != nil {
  197. logger.Errorf("Cannot create a sub group in the cgroup")
  198. return err
  199. }
  200. if err := testMgr.Delete(); err != nil {
  201. return err
  202. }
  203. for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
  204. logger.Debugf("Reading pids for subsystem %s", subsys.Name())
  205. procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false)
  206. if err != nil {
  207. p, err := pather(subsys.Name())
  208. if err != nil {
  209. return err
  210. }
  211. logger.Errorf("Cannot read pids in group %s of subsystem %s", p, subsys.Name())
  212. return err
  213. }
  214. if len(procs) != 0 {
  215. p, err := pather(subsys.Name())
  216. if err != nil {
  217. return err
  218. }
  219. return fmt.Errorf("There are remaining processes in cgroup %s of subsystem %s", p, subsys.Name())
  220. }
  221. }
  222. }
  223. }
  224. return nil
  225. }
  226. func newCgroupHook(p mirrorProvider, cfg cgroupConfig, memLimit MemBytes) *cgroupHook {
  227. var (
  228. basePath = cfg.BasePath
  229. baseGroup = cfg.Group
  230. subsystem = cfg.Subsystem
  231. )
  232. if basePath == "" {
  233. basePath = "/sys/fs/cgroup"
  234. }
  235. if baseGroup == "" {
  236. baseGroup = "tunasync"
  237. }
  238. if subsystem == "" {
  239. subsystem = "cpu"
  240. }
  241. return &cgroupHook{
  242. emptyHook: emptyHook{
  243. provider: p,
  244. },
  245. basePath: basePath,
  246. baseGroup: baseGroup,
  247. subsystem: subsystem,
  248. }
  249. }
  250. func (c *cgroupHook) preExec() error {
  251. c.created = true
  252. if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil {
  253. return err
  254. }
  255. if c.subsystem != "memory" {
  256. return nil
  257. }
  258. if c.memLimit != 0 {
  259. gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name())
  260. return sh.Command(
  261. "cgset", "-r",
  262. fmt.Sprintf("memory.limit_in_bytes=%d", c.memLimit.Value()),
  263. gname,
  264. ).Run()
  265. }
  266. return nil
  267. }
  268. func (c *cgroupHook) postExec() error {
  269. err := c.killAll()
  270. if err != nil {
  271. logger.Errorf("Error killing tasks: %s", err.Error())
  272. }
  273. c.created = false
  274. return sh.Command("cgdelete", c.Cgroup()).Run()
  275. }
  276. func (c *cgroupHook) Cgroup() string {
  277. name := c.provider.Name()
  278. return fmt.Sprintf("%s:%s/%s", c.subsystem, c.baseGroup, name)
  279. }
  280. func (c *cgroupHook) killAll() error {
  281. if !c.created {
  282. return nil
  283. }
  284. name := c.provider.Name()
  285. readTaskList := func() ([]int, error) {
  286. taskList := []int{}
  287. taskFile, err := os.Open(filepath.Join(c.basePath, c.subsystem, c.baseGroup, name, "tasks"))
  288. if err != nil {
  289. return taskList, err
  290. }
  291. defer taskFile.Close()
  292. scanner := bufio.NewScanner(taskFile)
  293. for scanner.Scan() {
  294. pid, err := strconv.Atoi(scanner.Text())
  295. if err != nil {
  296. return taskList, err
  297. }
  298. taskList = append(taskList, pid)
  299. }
  300. return taskList, nil
  301. }
  302. for i := 0; i < 4; i++ {
  303. if i == 3 {
  304. return errors.New("Unable to kill all child tasks")
  305. }
  306. taskList, err := readTaskList()
  307. if err != nil {
  308. return err
  309. }
  310. if len(taskList) == 0 {
  311. return nil
  312. }
  313. for _, pid := range taskList {
  314. // TODO: deal with defunct processes
  315. logger.Debugf("Killing process: %d", pid)
  316. unix.Kill(pid, syscall.SIGKILL)
  317. }
  318. // sleep 10ms for the first round, and 1.01s, 2.01s, 3.01s for the rest
  319. time.Sleep(time.Duration(i)*time.Second + 10*time.Millisecond)
  320. }
  321. return nil
  322. }