cgroup.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. package worker
  2. import (
  3. "errors"
  4. "fmt"
  5. "os"
  6. "os/exec"
  7. "path/filepath"
  8. "syscall"
  9. "time"
  10. "golang.org/x/sys/unix"
  11. "github.com/moby/moby/pkg/reexec"
  12. cgv1 "github.com/containerd/cgroups"
  13. cgv2 "github.com/containerd/cgroups/v2"
  14. contspecs "github.com/opencontainers/runtime-spec/specs-go"
  15. )
  16. type cgroupHook struct {
  17. emptyHook
  18. cgCfg cgroupConfig
  19. memLimit MemBytes
  20. cgMgrV1 cgv1.Cgroup
  21. cgMgrV2 *cgv2.Manager
  22. }
  23. func init () {
  24. reexec.Register("tunasync-exec", waitExec)
  25. }
  26. func waitExec () {
  27. binary, lookErr := exec.LookPath(os.Args[1])
  28. if lookErr != nil {
  29. panic(lookErr)
  30. }
  31. pipe := os.NewFile(3, "pipe")
  32. if pipe != nil {
  33. for {
  34. tmpBytes := make([]byte, 1)
  35. nRead, err := pipe.Read(tmpBytes)
  36. if err != nil {
  37. break
  38. }
  39. if nRead == 0 {
  40. break
  41. }
  42. }
  43. err := pipe.Close()
  44. if err != nil {
  45. }
  46. }
  47. args := os.Args[1:]
  48. env := os.Environ()
  49. execErr := syscall.Exec(binary, args, env)
  50. if execErr != nil {
  51. panic(execErr)
  52. }
  53. panic("Exec failed.")
  54. }
  55. func initCgroup(cfg *cgroupConfig) (error) {
  56. logger.Debugf("Initializing cgroup")
  57. baseGroup := cfg.Group
  58. //subsystem := cfg.Subsystem
  59. // If baseGroup is empty, it implies using the cgroup of the current process
  60. // otherwise, it refers to a absolute group path
  61. if baseGroup != "" {
  62. baseGroup = filepath.Join("/", baseGroup)
  63. }
  64. cfg.isUnified = cgv1.Mode() == cgv1.Unified
  65. if cfg.isUnified {
  66. logger.Debugf("Cgroup V2 detected")
  67. g := baseGroup
  68. if g == "" {
  69. logger.Debugf("Detecting my cgroup path")
  70. var err error
  71. if g, err = cgv2.NestedGroupPath(""); err != nil {
  72. return err
  73. }
  74. }
  75. logger.Infof("Using cgroup path: %s", g)
  76. var err error
  77. if cfg.cgMgrV2, err = cgv2.LoadManager("/sys/fs/cgroup", g); err != nil {
  78. return err
  79. }
  80. if baseGroup == "" {
  81. logger.Debugf("Creating a sub group and move all processes into it")
  82. wkrMgr, err := cfg.cgMgrV2.NewChild("__worker", nil);
  83. if err != nil {
  84. return err
  85. }
  86. for {
  87. logger.Debugf("Reading pids")
  88. procs, err := cfg.cgMgrV2.Procs(false)
  89. if err != nil {
  90. logger.Errorf("Cannot read pids in that group")
  91. return err
  92. }
  93. if len(procs) == 0 {
  94. break
  95. }
  96. for _, p := range(procs) {
  97. if err := wkrMgr.AddProc(p); err != nil{
  98. if errors.Is(err, syscall.ESRCH) {
  99. logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
  100. } else {
  101. return err
  102. }
  103. }
  104. }
  105. }
  106. } else {
  107. logger.Debugf("Trying to create a sub group in that group")
  108. testMgr, err := cfg.cgMgrV2.NewChild("__test", nil);
  109. if err != nil {
  110. logger.Errorf("Cannot create a sub group in the cgroup")
  111. return err
  112. }
  113. if err := testMgr.Delete(); err != nil {
  114. return err
  115. }
  116. procs, err := cfg.cgMgrV2.Procs(false)
  117. if err != nil {
  118. logger.Errorf("Cannot read pids in that group")
  119. return err
  120. }
  121. if len(procs) != 0 {
  122. return fmt.Errorf("There are remaining processes in cgroup %s", baseGroup)
  123. }
  124. }
  125. } else {
  126. logger.Debugf("Cgroup V1 detected")
  127. var pather cgv1.Path
  128. if baseGroup != "" {
  129. pather = cgv1.StaticPath(baseGroup)
  130. } else {
  131. pather = (func(p cgv1.Path) (cgv1.Path){
  132. return func(subsys cgv1.Name) (string, error){
  133. path, err := p(subsys);
  134. if err != nil {
  135. return "", err
  136. }
  137. if path == "/" {
  138. return "", cgv1.ErrControllerNotActive
  139. }
  140. return path, err
  141. }
  142. })(cgv1.NestedPath(""))
  143. }
  144. logger.Infof("Loading cgroup")
  145. var err error
  146. if cfg.cgMgrV1, err = cgv1.Load(cgv1.V1, pather); err != nil {
  147. return err
  148. }
  149. logger.Debugf("Available subsystems:")
  150. for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
  151. p, err := pather(subsys.Name())
  152. if err != nil {
  153. return err
  154. }
  155. logger.Debugf("%s: %s", subsys.Name(), p)
  156. }
  157. if baseGroup == "" {
  158. logger.Debugf("Creating a sub group and move all processes into it")
  159. wkrMgr, err := cfg.cgMgrV1.New("__worker", &contspecs.LinuxResources{});
  160. if err != nil {
  161. return err
  162. }
  163. for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
  164. logger.Debugf("Reading pids for subsystem %s", subsys.Name())
  165. for {
  166. procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false)
  167. if err != nil {
  168. p, err := pather(subsys.Name())
  169. if err != nil {
  170. return err
  171. }
  172. logger.Errorf("Cannot read pids in group %s of subsystem %s", p, subsys.Name())
  173. return err
  174. }
  175. if len(procs) == 0 {
  176. break
  177. }
  178. for _, proc := range(procs) {
  179. if err := wkrMgr.Add(proc); err != nil {
  180. if errors.Is(err, syscall.ESRCH) {
  181. logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
  182. } else {
  183. return err
  184. }
  185. }
  186. }
  187. }
  188. }
  189. } else {
  190. logger.Debugf("Trying to create a sub group in that group")
  191. testMgr, err := cfg.cgMgrV1.New("__test", &contspecs.LinuxResources{});
  192. if err != nil {
  193. logger.Errorf("Cannot create a sub group in the cgroup")
  194. return err
  195. }
  196. if err := testMgr.Delete(); err != nil {
  197. return err
  198. }
  199. for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
  200. logger.Debugf("Reading pids for subsystem %s", subsys.Name())
  201. procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false)
  202. if err != nil {
  203. p, err := pather(subsys.Name())
  204. if err != nil {
  205. return err
  206. }
  207. logger.Errorf("Cannot read pids in group %s of subsystem %s", p, subsys.Name())
  208. return err
  209. }
  210. if len(procs) != 0 {
  211. p, err := pather(subsys.Name())
  212. if err != nil {
  213. return err
  214. }
  215. return fmt.Errorf("There are remaining processes in cgroup %s of subsystem %s", p, subsys.Name())
  216. }
  217. }
  218. }
  219. }
  220. return nil
  221. }
  222. func newCgroupHook(p mirrorProvider, cfg cgroupConfig, memLimit MemBytes) *cgroupHook {
  223. return &cgroupHook{
  224. emptyHook: emptyHook{
  225. provider: p,
  226. },
  227. cgCfg: cfg,
  228. }
  229. }
  230. func (c *cgroupHook) preExec() error {
  231. if c.cgCfg.isUnified {
  232. logger.Debugf("Creating v2 cgroup for task %s", c.provider.Name())
  233. var resSet *cgv2.Resources
  234. if c.memLimit != 0 {
  235. resSet = &cgv2.Resources {
  236. Memory: &cgv2.Memory{
  237. Max: func(i int64) *int64 { return &i }(c.memLimit.Value()),
  238. },
  239. }
  240. }
  241. subMgr, err := c.cgMgrV2.NewChild(c.provider.Name(), resSet)
  242. if err != nil {
  243. logger.Errorf("Failed to create cgroup for task %s: %s", c.provider.Name(), err.Error())
  244. return err
  245. }
  246. c.cgMgrV2 = subMgr
  247. } else {
  248. logger.Debugf("Creating v1 cgroup for task %s", c.provider.Name())
  249. var resSet *contspecs.LinuxResources
  250. if c.memLimit != 0 {
  251. resSet = &contspecs.LinuxResources {
  252. Memory: &contspecs.LinuxMemory{
  253. Limit: func(i int64) *int64 { return &i }(c.memLimit.Value()),
  254. },
  255. }
  256. }
  257. subMgr, err := c.cgMgrV1.New(c.provider.Name(), resSet)
  258. if err != nil {
  259. logger.Errorf("Failed to create cgroup for task %s: %s", c.provider.Name(), err.Error())
  260. return err
  261. }
  262. c.cgMgrV1 = subMgr
  263. }
  264. return nil
  265. }
  266. func (c *cgroupHook) postExec() error {
  267. err := c.killAll()
  268. if err != nil {
  269. logger.Errorf("Error killing tasks: %s", err.Error())
  270. }
  271. if c.cgCfg.isUnified {
  272. logger.Debugf("Deleting v2 cgroup for task %s", c.provider.Name())
  273. if err := c.cgMgrV2.Delete(); err != nil {
  274. logger.Errorf("Failed to delete cgroup for task %s: %s", c.provider.Name(), err.Error())
  275. return err
  276. }
  277. c.cgMgrV2 = nil
  278. } else {
  279. logger.Debugf("Deleting v1 cgroup for task %s", c.provider.Name())
  280. if err := c.cgMgrV1.Delete(); err != nil {
  281. logger.Errorf("Failed to delete cgroup for task %s: %s", c.provider.Name(), err.Error())
  282. return err
  283. }
  284. c.cgMgrV1 = nil
  285. }
  286. return nil
  287. }
  288. func (c *cgroupHook) killAll() error {
  289. if c.cgCfg.isUnified {
  290. if c.cgMgrV2 == nil {
  291. return nil
  292. }
  293. } else {
  294. if c.cgMgrV1 == nil {
  295. return nil
  296. }
  297. }
  298. readTaskList := func() ([]int, error) {
  299. taskList := []int{}
  300. if c.cgCfg.isUnified {
  301. procs, err := c.cgMgrV2.Procs(false)
  302. if (err != nil) {
  303. return []int{}, err
  304. }
  305. for _, proc := range procs {
  306. taskList = append(taskList, int(proc))
  307. }
  308. } else {
  309. taskSet := make(map[int]struct{})
  310. for _, subsys := range(c.cgMgrV1.Subsystems()) {
  311. procs, err := c.cgMgrV1.Processes(subsys.Name(), false)
  312. if err != nil {
  313. return []int{}, err
  314. }
  315. for _, proc := range(procs) {
  316. taskSet[proc.Pid] = struct{}{}
  317. }
  318. }
  319. for proc := range(taskSet) {
  320. taskList = append(taskList, proc)
  321. }
  322. }
  323. return taskList, nil
  324. }
  325. for i := 0; i < 4; i++ {
  326. if i == 3 {
  327. return errors.New("Unable to kill all child tasks")
  328. }
  329. taskList, err := readTaskList()
  330. if err != nil {
  331. return err
  332. }
  333. if len(taskList) == 0 {
  334. return nil
  335. }
  336. for _, pid := range taskList {
  337. // TODO: deal with defunct processes
  338. logger.Debugf("Killing process: %d", pid)
  339. unix.Kill(pid, syscall.SIGKILL)
  340. }
  341. // sleep 10ms for the first round, and 1.01s, 2.01s, 3.01s for the rest
  342. time.Sleep(time.Duration(i)*time.Second + 10*time.Millisecond)
  343. }
  344. return nil
  345. }