2
0

tunasynctl.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "os"
  8. "strconv"
  9. "strings"
  10. "text/template"
  11. "time"
  12. "github.com/BurntSushi/toml"
  13. "github.com/urfave/cli"
  14. "gopkg.in/op/go-logging.v1"
  15. tunasync "github.com/tuna/tunasync/internal"
  16. )
  17. var (
  18. buildstamp = ""
  19. githash = "No githash provided"
  20. )
  21. const (
  22. listJobsPath = "/jobs"
  23. listWorkersPath = "/workers"
  24. flushDisabledPath = "/jobs/disabled"
  25. cmdPath = "/cmd"
  26. systemCfgFile = "/etc/tunasync/ctl.conf" // system-wide conf
  27. userCfgFile = "$HOME/.config/tunasync/ctl.conf" // user-specific conf
  28. )
  29. var logger = logging.MustGetLogger("tunasynctl")
  30. var baseURL string
  31. var client *http.Client
  32. func initializeWrapper(handler cli.ActionFunc) cli.ActionFunc {
  33. return func(c *cli.Context) error {
  34. err := initialize(c)
  35. if err != nil {
  36. return cli.NewExitError(err.Error(), 1)
  37. }
  38. return handler(c)
  39. }
  40. }
  41. type config struct {
  42. ManagerAddr string `toml:"manager_addr"`
  43. ManagerPort int `toml:"manager_port"`
  44. CACert string `toml:"ca_cert"`
  45. }
  46. func loadConfig(cfgFile string, cfg *config) error {
  47. if cfgFile != "" {
  48. logger.Infof("Loading config: %s", cfgFile)
  49. if _, err := toml.DecodeFile(cfgFile, cfg); err != nil {
  50. // logger.Errorf(err.Error())
  51. return err
  52. }
  53. }
  54. return nil
  55. }
  56. func initialize(c *cli.Context) error {
  57. // init logger
  58. tunasync.InitLogger(c.Bool("verbose"), c.Bool("debug"), false)
  59. cfg := new(config)
  60. // default configs
  61. cfg.ManagerAddr = "localhost"
  62. cfg.ManagerPort = 14242
  63. // find config file and load config
  64. if _, err := os.Stat(systemCfgFile); err == nil {
  65. err = loadConfig(systemCfgFile, cfg)
  66. if err != nil {
  67. return err
  68. }
  69. }
  70. logger.Debug("user config file: %s", os.ExpandEnv(userCfgFile))
  71. if _, err := os.Stat(os.ExpandEnv(userCfgFile)); err == nil {
  72. err = loadConfig(os.ExpandEnv(userCfgFile), cfg)
  73. if err != nil {
  74. return err
  75. }
  76. }
  77. if c.String("config") != "" {
  78. err := loadConfig(c.String("config"), cfg)
  79. if err != nil {
  80. return err
  81. }
  82. }
  83. // override config using the command-line arguments
  84. if c.String("manager") != "" {
  85. cfg.ManagerAddr = c.String("manager")
  86. }
  87. if c.Int("port") > 0 {
  88. cfg.ManagerPort = c.Int("port")
  89. }
  90. if c.String("ca-cert") != "" {
  91. cfg.CACert = c.String("ca-cert")
  92. }
  93. // parse base url of the manager server
  94. if cfg.CACert != "" {
  95. baseURL = fmt.Sprintf("https://%s:%d", cfg.ManagerAddr, cfg.ManagerPort)
  96. } else {
  97. baseURL = fmt.Sprintf("http://%s:%d", cfg.ManagerAddr, cfg.ManagerPort)
  98. }
  99. logger.Infof("Use manager address: %s", baseURL)
  100. // create HTTP client
  101. var err error
  102. client, err = tunasync.CreateHTTPClient(cfg.CACert)
  103. if err != nil {
  104. err = fmt.Errorf("Error initializing HTTP client: %s", err.Error())
  105. // logger.Error(err.Error())
  106. return err
  107. }
  108. return nil
  109. }
  110. func listWorkers(c *cli.Context) error {
  111. var workers []tunasync.WorkerStatus
  112. _, err := tunasync.GetJSON(baseURL+listWorkersPath, &workers, client)
  113. if err != nil {
  114. return cli.NewExitError(
  115. fmt.Sprintf("Filed to correctly get informations from"+
  116. "manager server: %s", err.Error()), 1)
  117. }
  118. b, err := json.MarshalIndent(workers, "", " ")
  119. if err != nil {
  120. return cli.NewExitError(
  121. fmt.Sprintf("Error printing out informations: %s",
  122. err.Error()),
  123. 1)
  124. }
  125. fmt.Println(string(b))
  126. return nil
  127. }
  128. func listJobs(c *cli.Context) error {
  129. var genericJobs interface{}
  130. if c.Bool("all") {
  131. var jobs []tunasync.WebMirrorStatus
  132. _, err := tunasync.GetJSON(baseURL+listJobsPath, &jobs, client)
  133. if err != nil {
  134. return cli.NewExitError(
  135. fmt.Sprintf("Failed to correctly get information "+
  136. "of all jobs from manager server: %s", err.Error()),
  137. 1)
  138. }
  139. if statusStr := c.String("status"); statusStr != "" {
  140. filteredJobs := make([]tunasync.WebMirrorStatus, 0, len(jobs))
  141. var statuses []tunasync.SyncStatus
  142. for _, s := range strings.Split(statusStr, ",") {
  143. var status tunasync.SyncStatus
  144. err = status.UnmarshalJSON([]byte("\"" + strings.TrimSpace(s) + "\""))
  145. if err != nil {
  146. return cli.NewExitError(
  147. fmt.Sprintf("Error parsing status: %s", err.Error()),
  148. 1)
  149. }
  150. statuses = append(statuses, status)
  151. }
  152. for _, job := range jobs {
  153. for _, s := range statuses {
  154. if job.Status == s {
  155. filteredJobs = append(filteredJobs, job)
  156. break
  157. }
  158. }
  159. }
  160. genericJobs = filteredJobs
  161. } else {
  162. genericJobs = jobs
  163. }
  164. } else {
  165. var jobs []tunasync.MirrorStatus
  166. args := c.Args()
  167. if len(args) == 0 {
  168. return cli.NewExitError(
  169. fmt.Sprintf("Usage Error: jobs command need at"+
  170. " least one arguments or \"--all\" flag."), 1)
  171. }
  172. ans := make(chan []tunasync.MirrorStatus, len(args))
  173. for _, workerID := range args {
  174. go func(workerID string) {
  175. var workerJobs []tunasync.MirrorStatus
  176. _, err := tunasync.GetJSON(fmt.Sprintf("%s/workers/%s/jobs",
  177. baseURL, workerID), &workerJobs, client)
  178. if err != nil {
  179. logger.Infof("Failed to correctly get jobs"+
  180. " for worker %s: %s", workerID, err.Error())
  181. }
  182. ans <- workerJobs
  183. }(workerID)
  184. }
  185. for range args {
  186. job := <-ans
  187. if job == nil {
  188. return cli.NewExitError(
  189. fmt.Sprintf("Failed to correctly get information "+
  190. "of jobs from at least one manager"),
  191. 1)
  192. }
  193. jobs = append(jobs, job...)
  194. }
  195. genericJobs = jobs
  196. }
  197. if format := c.String("format"); format != "" {
  198. tpl := template.New("")
  199. _, err := tpl.Parse(format)
  200. if err != nil {
  201. return cli.NewExitError(
  202. fmt.Sprintf("Error parsing format template: %s", err.Error()),
  203. 1)
  204. }
  205. switch jobs := genericJobs.(type) {
  206. case []tunasync.WebMirrorStatus:
  207. for _, job := range jobs {
  208. err = tpl.Execute(os.Stdout, job)
  209. if err != nil {
  210. return cli.NewExitError(
  211. fmt.Sprintf("Error printing out information: %s", err.Error()),
  212. 1)
  213. }
  214. fmt.Println()
  215. }
  216. case []tunasync.MirrorStatus:
  217. for _, job := range jobs {
  218. err = tpl.Execute(os.Stdout, job)
  219. if err != nil {
  220. return cli.NewExitError(
  221. fmt.Sprintf("Error printing out information: %s", err.Error()),
  222. 1)
  223. }
  224. fmt.Println()
  225. }
  226. }
  227. } else {
  228. b, err := json.MarshalIndent(genericJobs, "", " ")
  229. if err != nil {
  230. return cli.NewExitError(
  231. fmt.Sprintf("Error printing out information: %s", err.Error()),
  232. 1)
  233. }
  234. fmt.Println(string(b))
  235. }
  236. return nil
  237. }
  238. func updateMirrorSize(c *cli.Context) error {
  239. args := c.Args()
  240. if len(args) != 2 {
  241. return cli.NewExitError("Usage: tunasynctl set-size -w <worker-id> <mirror> <size>", 1)
  242. }
  243. workerID := c.String("worker")
  244. mirrorID := args.Get(0)
  245. mirrorSize := args.Get(1)
  246. msg := struct {
  247. Name string `json:"name"`
  248. Size string `json:"size"`
  249. }{
  250. Name: mirrorID,
  251. Size: mirrorSize,
  252. }
  253. url := fmt.Sprintf(
  254. "%s/workers/%s/jobs/%s/size", baseURL, workerID, mirrorID,
  255. )
  256. resp, err := tunasync.PostJSON(url, msg, client)
  257. if err != nil {
  258. return cli.NewExitError(
  259. fmt.Sprintf("Failed to send request to manager: %s",
  260. err.Error()),
  261. 1)
  262. }
  263. defer resp.Body.Close()
  264. body, _ := ioutil.ReadAll(resp.Body)
  265. if resp.StatusCode != http.StatusOK {
  266. return cli.NewExitError(
  267. fmt.Sprintf("Manager failed to update mirror size: %s", body), 1,
  268. )
  269. }
  270. var status tunasync.MirrorStatus
  271. json.Unmarshal(body, &status)
  272. if status.Size != mirrorSize {
  273. return cli.NewExitError(
  274. fmt.Sprintf(
  275. "Mirror size error, expecting %s, manager returned %s",
  276. mirrorSize, status.Size,
  277. ), 1,
  278. )
  279. }
  280. fmt.Printf("Successfully updated mirror size to %s\n", mirrorSize)
  281. return nil
  282. }
  283. func removeWorker(c *cli.Context) error {
  284. args := c.Args()
  285. if len(args) != 0 {
  286. return cli.NewExitError("Usage: tunasynctl -w <worker-id>", 1)
  287. }
  288. workerID := c.String("worker")
  289. if len(workerID) == 0 {
  290. return cli.NewExitError("Please specify the <worker-id>", 1)
  291. }
  292. url := fmt.Sprintf("%s/workers/%s", baseURL, workerID)
  293. req, err := http.NewRequest("DELETE", url, nil)
  294. if err != nil {
  295. logger.Panicf("Invalid HTTP Request: %s", err.Error())
  296. }
  297. resp, err := client.Do(req)
  298. if err != nil {
  299. return cli.NewExitError(
  300. fmt.Sprintf("Failed to send request to manager: %s", err.Error()), 1)
  301. }
  302. defer resp.Body.Close()
  303. if resp.StatusCode != http.StatusOK {
  304. body, err := ioutil.ReadAll(resp.Body)
  305. if err != nil {
  306. return cli.NewExitError(
  307. fmt.Sprintf("Failed to parse response: %s", err.Error()),
  308. 1)
  309. }
  310. return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
  311. " command: HTTP status code is not 200: %s", body),
  312. 1)
  313. }
  314. res := map[string]string{}
  315. err = json.NewDecoder(resp.Body).Decode(&res)
  316. if res["message"] == "deleted" {
  317. fmt.Println("Successfully removed the worker")
  318. } else {
  319. return cli.NewExitError("Failed to remove the worker", 1)
  320. }
  321. return nil
  322. }
  323. func flushDisabledJobs(c *cli.Context) error {
  324. req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil)
  325. if err != nil {
  326. logger.Panicf("Invalid HTTP Request: %s", err.Error())
  327. }
  328. resp, err := client.Do(req)
  329. if err != nil {
  330. return cli.NewExitError(
  331. fmt.Sprintf("Failed to send request to manager: %s",
  332. err.Error()),
  333. 1)
  334. }
  335. defer resp.Body.Close()
  336. if resp.StatusCode != http.StatusOK {
  337. body, err := ioutil.ReadAll(resp.Body)
  338. if err != nil {
  339. return cli.NewExitError(
  340. fmt.Sprintf("Failed to parse response: %s", err.Error()),
  341. 1)
  342. }
  343. return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
  344. " command: HTTP status code is not 200: %s", body),
  345. 1)
  346. }
  347. fmt.Println("Successfully flushed disabled jobs")
  348. return nil
  349. }
  350. func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
  351. return func(c *cli.Context) error {
  352. var mirrorID string
  353. var argsList []string
  354. if len(c.Args()) == 1 {
  355. mirrorID = c.Args()[0]
  356. } else if len(c.Args()) == 2 {
  357. mirrorID = c.Args()[0]
  358. for _, arg := range strings.Split(c.Args()[1], ",") {
  359. argsList = append(argsList, strings.TrimSpace(arg))
  360. }
  361. } else {
  362. return cli.NewExitError("Usage Error: cmd command receive just "+
  363. "1 required positional argument MIRROR and 1 optional "+
  364. "argument WORKER", 1)
  365. }
  366. options := map[string]bool{}
  367. if c.Bool("force") {
  368. options["force"] = true
  369. }
  370. cmd := tunasync.ClientCmd{
  371. Cmd: cmd,
  372. MirrorID: mirrorID,
  373. WorkerID: c.String("worker"),
  374. Args: argsList,
  375. Options: options,
  376. }
  377. resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client)
  378. if err != nil {
  379. return cli.NewExitError(
  380. fmt.Sprintf("Failed to correctly send command: %s",
  381. err.Error()),
  382. 1)
  383. }
  384. defer resp.Body.Close()
  385. if resp.StatusCode != http.StatusOK {
  386. body, err := ioutil.ReadAll(resp.Body)
  387. if err != nil {
  388. return cli.NewExitError(
  389. fmt.Sprintf("Failed to parse response: %s", err.Error()),
  390. 1)
  391. }
  392. return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
  393. " command: HTTP status code is not 200: %s", body),
  394. 1)
  395. }
  396. fmt.Println("Successfully send the command")
  397. return nil
  398. }
  399. }
  400. func cmdWorker(cmd tunasync.CmdVerb) cli.ActionFunc {
  401. return func(c *cli.Context) error {
  402. if c.String("worker") == "" {
  403. return cli.NewExitError("Please specify the worker with -w <worker-id>", 1)
  404. }
  405. cmd := tunasync.ClientCmd{
  406. Cmd: cmd,
  407. WorkerID: c.String("worker"),
  408. }
  409. resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client)
  410. if err != nil {
  411. return cli.NewExitError(
  412. fmt.Sprintf("Failed to correctly send command: %s",
  413. err.Error()),
  414. 1)
  415. }
  416. defer resp.Body.Close()
  417. if resp.StatusCode != http.StatusOK {
  418. body, err := ioutil.ReadAll(resp.Body)
  419. if err != nil {
  420. return cli.NewExitError(
  421. fmt.Sprintf("Failed to parse response: %s", err.Error()),
  422. 1)
  423. }
  424. return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
  425. " command: HTTP status code is not 200: %s", body),
  426. 1)
  427. }
  428. fmt.Println("Successfully send the command")
  429. return nil
  430. }
  431. }
  432. func main() {
  433. cli.VersionPrinter = func(c *cli.Context) {
  434. var builddate string
  435. if buildstamp == "" {
  436. builddate = "No build date provided"
  437. } else {
  438. ts, err := strconv.Atoi(buildstamp)
  439. if err != nil {
  440. builddate = "No build date provided"
  441. } else {
  442. t := time.Unix(int64(ts), 0)
  443. builddate = t.String()
  444. }
  445. }
  446. fmt.Printf(
  447. "Version: %s\n"+
  448. "Git Hash: %s\n"+
  449. "Build Date: %s\n",
  450. c.App.Version, githash, builddate,
  451. )
  452. }
  453. app := cli.NewApp()
  454. app.EnableBashCompletion = true
  455. app.Version = tunasync.Version
  456. app.Name = "tunasynctl"
  457. app.Usage = "control client for tunasync manager"
  458. commonFlags := []cli.Flag{
  459. cli.StringFlag{
  460. Name: "config, c",
  461. Usage: "Read configuration from `FILE` rather than" +
  462. " ~/.config/tunasync/ctl.conf and /etc/tunasync/ctl.conf",
  463. },
  464. cli.StringFlag{
  465. Name: "manager, m",
  466. Usage: "The manager server address",
  467. },
  468. cli.StringFlag{
  469. Name: "port, p",
  470. Usage: "The manager server port",
  471. },
  472. cli.StringFlag{
  473. Name: "ca-cert",
  474. Usage: "Trust root CA cert file `CERT`",
  475. },
  476. cli.BoolFlag{
  477. Name: "verbose, v",
  478. Usage: "Enable verbosely logging",
  479. },
  480. cli.BoolFlag{
  481. Name: "debug",
  482. Usage: "Enable debugging logging",
  483. },
  484. }
  485. cmdFlags := []cli.Flag{
  486. cli.StringFlag{
  487. Name: "worker, w",
  488. Usage: "Send the command to `WORKER`",
  489. },
  490. }
  491. forceStartFlag := cli.BoolFlag{
  492. Name: "force, f",
  493. Usage: "Override the concurrent limit",
  494. }
  495. app.Commands = []cli.Command{
  496. {
  497. Name: "list",
  498. Usage: "List jobs of workers",
  499. Flags: append(commonFlags,
  500. []cli.Flag{
  501. cli.BoolFlag{
  502. Name: "all, a",
  503. Usage: "List all jobs of all workers",
  504. },
  505. cli.StringFlag{
  506. Name: "status, s",
  507. Usage: "Filter output based on status provided",
  508. },
  509. cli.StringFlag{
  510. Name: "format, f",
  511. Usage: "Pretty-print containers using a Go template",
  512. },
  513. }...),
  514. Action: initializeWrapper(listJobs),
  515. },
  516. {
  517. Name: "flush",
  518. Usage: "Flush disabled jobs",
  519. Flags: commonFlags,
  520. Action: initializeWrapper(flushDisabledJobs),
  521. },
  522. {
  523. Name: "workers",
  524. Usage: "List workers",
  525. Flags: commonFlags,
  526. Action: initializeWrapper(listWorkers),
  527. },
  528. {
  529. Name: "rm-worker",
  530. Usage: "Remove a worker",
  531. Flags: append(
  532. commonFlags,
  533. cli.StringFlag{
  534. Name: "worker, w",
  535. Usage: "worker-id of the worker to be removed",
  536. },
  537. ),
  538. Action: initializeWrapper(removeWorker),
  539. },
  540. {
  541. Name: "set-size",
  542. Usage: "Set mirror size",
  543. Flags: append(
  544. commonFlags,
  545. cli.StringFlag{
  546. Name: "worker, w",
  547. Usage: "specify worker-id of the mirror job",
  548. },
  549. ),
  550. Action: initializeWrapper(updateMirrorSize),
  551. },
  552. {
  553. Name: "start",
  554. Usage: "Start a job",
  555. Flags: append(append(commonFlags, cmdFlags...), forceStartFlag),
  556. Action: initializeWrapper(cmdJob(tunasync.CmdStart)),
  557. },
  558. {
  559. Name: "stop",
  560. Usage: "Stop a job",
  561. Flags: append(commonFlags, cmdFlags...),
  562. Action: initializeWrapper(cmdJob(tunasync.CmdStop)),
  563. },
  564. {
  565. Name: "disable",
  566. Usage: "Disable a job",
  567. Flags: append(commonFlags, cmdFlags...),
  568. Action: initializeWrapper(cmdJob(tunasync.CmdDisable)),
  569. },
  570. {
  571. Name: "restart",
  572. Usage: "Restart a job",
  573. Flags: append(commonFlags, cmdFlags...),
  574. Action: initializeWrapper(cmdJob(tunasync.CmdRestart)),
  575. },
  576. {
  577. Name: "reload",
  578. Usage: "Tell worker to reload configurations",
  579. Flags: append(commonFlags, cmdFlags...),
  580. Action: initializeWrapper(cmdWorker(tunasync.CmdReload)),
  581. },
  582. {
  583. Name: "ping",
  584. Flags: append(commonFlags, cmdFlags...),
  585. Action: initializeWrapper(cmdJob(tunasync.CmdPing)),
  586. },
  587. }
  588. app.Run(os.Args)
  589. }