rc.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. // Package rc provides remote control of a Syncthing process via the REST API.
  7. package rc
  8. import (
  9. "bufio"
  10. "bytes"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "io/ioutil"
  16. "log"
  17. "net/http"
  18. "net/url"
  19. "os"
  20. "os/exec"
  21. "path/filepath"
  22. "strconv"
  23. "time"
  24. "github.com/syncthing/syncthing/lib/config"
  25. "github.com/syncthing/syncthing/lib/dialer"
  26. "github.com/syncthing/syncthing/lib/protocol"
  27. "github.com/syncthing/syncthing/lib/sync"
  28. )
  29. // APIKey is set via the STGUIAPIKEY variable when we launch the binary, to
  30. // ensure that we have API access regardless of authentication settings.
  31. const APIKey = "592A47BC-A7DF-4C2F-89E0-A80B3E5094EE"
  32. type Process struct {
  33. // Set at initialization
  34. addr string
  35. // Set by eventLoop()
  36. eventMut sync.Mutex
  37. id protocol.DeviceID
  38. folders []string
  39. startComplete chan struct{}
  40. stopped chan struct{}
  41. stopErr error
  42. sequence map[string]map[string]int64 // Folder ID => Device ID => Sequence
  43. done map[string]bool // Folder ID => 100%
  44. cmd *exec.Cmd
  45. logfd *os.File
  46. }
  47. // NewProcess returns a new Process talking to Syncthing at the specified address.
  48. // Example: NewProcess("127.0.0.1:8082")
  49. func NewProcess(addr string) *Process {
  50. p := &Process{
  51. addr: addr,
  52. sequence: make(map[string]map[string]int64),
  53. done: make(map[string]bool),
  54. eventMut: sync.NewMutex(),
  55. startComplete: make(chan struct{}),
  56. stopped: make(chan struct{}),
  57. }
  58. return p
  59. }
  60. func (p *Process) ID() protocol.DeviceID {
  61. return p.id
  62. }
  63. // LogTo creates the specified log file and ensures that stdout and stderr
  64. // from the Start()ed process is redirected there. Must be called before
  65. // Start().
  66. func (p *Process) LogTo(filename string) error {
  67. if p.cmd != nil {
  68. panic("logfd cannot be set with an existing cmd")
  69. }
  70. if p.logfd != nil {
  71. p.logfd.Close()
  72. }
  73. fd, err := os.Create(filename)
  74. if err != nil {
  75. return err
  76. }
  77. p.logfd = fd
  78. return nil
  79. }
  80. // Start runs the specified Syncthing binary with the given arguments.
  81. // Syncthing should be configured to provide an API on the address given to
  82. // NewProcess. Event processing is started.
  83. func (p *Process) Start(bin string, args ...string) error {
  84. cmd := exec.Command(bin, args...)
  85. if p.logfd != nil {
  86. cmd.Stdout = p.logfd
  87. cmd.Stderr = p.logfd
  88. }
  89. cmd.Env = append(os.Environ(), "STNORESTART=1", "STGUIAPIKEY="+APIKey)
  90. err := cmd.Start()
  91. if err != nil {
  92. return err
  93. }
  94. p.cmd = cmd
  95. go p.eventLoop()
  96. go p.wait()
  97. return nil
  98. }
  99. func (p *Process) wait() {
  100. _ = p.cmd.Wait()
  101. if p.logfd != nil {
  102. p.stopErr = p.checkForProblems(p.logfd)
  103. }
  104. close(p.stopped)
  105. }
  106. // AwaitStartup waits for the Syncthing process to start and perform initial
  107. // scans of all folders.
  108. func (p *Process) AwaitStartup() {
  109. select {
  110. case <-p.startComplete:
  111. case <-p.stopped:
  112. }
  113. }
  114. // Stop stops the running Syncthing process. If the process was logging to a
  115. // local file (set by LogTo), the log file will be opened and checked for
  116. // panics and data races. The presence of either will be signalled in the form
  117. // of a returned error.
  118. func (p *Process) Stop() (*os.ProcessState, error) {
  119. select {
  120. case <-p.stopped:
  121. return p.cmd.ProcessState, p.stopErr
  122. default:
  123. }
  124. if _, err := p.Post("/rest/system/shutdown", nil); err != nil && err != io.ErrUnexpectedEOF {
  125. // Unexpected EOF is somewhat expected here, as we may exit before
  126. // returning something sensible.
  127. return nil, err
  128. }
  129. <-p.stopped
  130. return p.cmd.ProcessState, p.stopErr
  131. }
  132. // Get performs an HTTP GET and returns the bytes and/or an error. Any non-200
  133. // return code is returned as an error.
  134. func (p *Process) Get(path string) ([]byte, error) {
  135. client := &http.Client{
  136. Timeout: 30 * time.Second,
  137. Transport: &http.Transport{
  138. Dial: dialer.Dial,
  139. Proxy: http.ProxyFromEnvironment,
  140. DisableKeepAlives: true,
  141. },
  142. }
  143. url := fmt.Sprintf("http://%s%s", p.addr, path)
  144. req, err := http.NewRequest("GET", url, nil)
  145. if err != nil {
  146. return nil, err
  147. }
  148. req.Header.Add("X-API-Key", APIKey)
  149. resp, err := client.Do(req)
  150. if err != nil {
  151. return nil, err
  152. }
  153. return p.readResponse(resp)
  154. }
  155. // Post performs an HTTP POST and returns the bytes and/or an error. Any
  156. // non-200 return code is returned as an error.
  157. func (p *Process) Post(path string, data io.Reader) ([]byte, error) {
  158. client := &http.Client{
  159. Timeout: 600 * time.Second,
  160. Transport: &http.Transport{
  161. DisableKeepAlives: true,
  162. },
  163. }
  164. url := fmt.Sprintf("http://%s%s", p.addr, path)
  165. req, err := http.NewRequest("POST", url, data)
  166. if err != nil {
  167. return nil, err
  168. }
  169. req.Header.Add("X-API-Key", APIKey)
  170. req.Header.Add("Content-Type", "application/json")
  171. resp, err := client.Do(req)
  172. if err != nil {
  173. return nil, err
  174. }
  175. return p.readResponse(resp)
  176. }
  177. type Event struct {
  178. ID int
  179. Time time.Time
  180. Type string
  181. Data interface{}
  182. }
  183. func (p *Process) Events(since int) ([]Event, error) {
  184. bs, err := p.Get(fmt.Sprintf("/rest/events?since=%d&timeout=10", since))
  185. if err != nil {
  186. return nil, err
  187. }
  188. var evs []Event
  189. dec := json.NewDecoder(bytes.NewReader(bs))
  190. dec.UseNumber()
  191. err = dec.Decode(&evs)
  192. if err != nil {
  193. return nil, fmt.Errorf("Events: %s in %q", err, bs)
  194. }
  195. return evs, err
  196. }
  197. func (p *Process) Rescan(folder string) error {
  198. _, err := p.Post("/rest/db/scan?folder="+url.QueryEscape(folder), nil)
  199. return err
  200. }
  201. func (p *Process) RescanDelay(folder string, delaySeconds int) error {
  202. _, err := p.Post(fmt.Sprintf("/rest/db/scan?folder=%s&next=%d", url.QueryEscape(folder), delaySeconds), nil)
  203. return err
  204. }
  205. func (p *Process) RescanSub(folder string, sub string, delaySeconds int) error {
  206. return p.RescanSubs(folder, []string{sub}, delaySeconds)
  207. }
  208. func (p *Process) RescanSubs(folder string, subs []string, delaySeconds int) error {
  209. data := url.Values{}
  210. data.Set("folder", folder)
  211. for _, sub := range subs {
  212. data.Add("sub", sub)
  213. }
  214. data.Set("next", strconv.Itoa(delaySeconds))
  215. _, err := p.Post("/rest/db/scan?"+data.Encode(), nil)
  216. return err
  217. }
  218. func (p *Process) ConfigInSync() (bool, error) {
  219. bs, err := p.Get("/rest/system/config/insync")
  220. if err != nil {
  221. return false, err
  222. }
  223. return bytes.Contains(bs, []byte("true")), nil
  224. }
  225. func (p *Process) GetConfig() (config.Configuration, error) {
  226. var cfg config.Configuration
  227. bs, err := p.Get("/rest/system/config")
  228. if err != nil {
  229. return cfg, err
  230. }
  231. err = json.Unmarshal(bs, &cfg)
  232. return cfg, err
  233. }
  234. func (p *Process) PostConfig(cfg config.Configuration) error {
  235. buf := new(bytes.Buffer)
  236. if err := json.NewEncoder(buf).Encode(cfg); err != nil {
  237. return err
  238. }
  239. _, err := p.Post("/rest/system/config", buf)
  240. return err
  241. }
  242. func (p *Process) PauseDevice(dev protocol.DeviceID) error {
  243. _, err := p.Post("/rest/system/pause?device="+dev.String(), nil)
  244. return err
  245. }
  246. func (p *Process) ResumeDevice(dev protocol.DeviceID) error {
  247. _, err := p.Post("/rest/system/resume?device="+dev.String(), nil)
  248. return err
  249. }
  250. func (p *Process) PauseAll() error {
  251. _, err := p.Post("/rest/system/pause", nil)
  252. return err
  253. }
  254. func (p *Process) ResumeAll() error {
  255. _, err := p.Post("/rest/system/resume", nil)
  256. return err
  257. }
  258. func InSync(folder string, ps ...*Process) bool {
  259. for _, p := range ps {
  260. p.eventMut.Lock()
  261. }
  262. defer func() {
  263. for _, p := range ps {
  264. p.eventMut.Unlock()
  265. }
  266. }()
  267. for i := range ps {
  268. // If our latest FolderSummary didn't report 100%, then we are not done.
  269. if !ps[i].done[folder] {
  270. l.Debugf("done = ps[%d].done[%q] = false", i, folder)
  271. return false
  272. }
  273. // Check Sequence for each device. The local version seen by remote
  274. // devices should be the same as what it has locally, or the index
  275. // hasn't been sent yet.
  276. sourceID := ps[i].id.String()
  277. sourceSeq := ps[i].sequence[folder][sourceID]
  278. l.Debugf("sourceSeq = ps[%d].sequence[%q][%q] = %d", i, folder, sourceID, sourceSeq)
  279. for j := range ps {
  280. if i != j {
  281. remoteSeq := ps[j].sequence[folder][sourceID]
  282. if remoteSeq != sourceSeq {
  283. l.Debugf("remoteSeq = ps[%d].sequence[%q][%q] = %d", j, folder, sourceID, remoteSeq)
  284. return false
  285. }
  286. }
  287. }
  288. }
  289. return true
  290. }
  291. func AwaitSync(folder string, ps ...*Process) {
  292. for {
  293. time.Sleep(250 * time.Millisecond)
  294. if InSync(folder, ps...) {
  295. return
  296. }
  297. }
  298. }
  299. type Model struct {
  300. GlobalBytes int
  301. GlobalDeleted int
  302. GlobalFiles int
  303. InSyncBytes int
  304. InSyncFiles int
  305. Invalid string
  306. LocalBytes int
  307. LocalDeleted int
  308. LocalFiles int
  309. NeedBytes int
  310. NeedFiles int
  311. State string
  312. StateChanged time.Time
  313. Version int
  314. }
  315. func (p *Process) Model(folder string) (Model, error) {
  316. bs, err := p.Get("/rest/db/status?folder=" + url.QueryEscape(folder))
  317. if err != nil {
  318. return Model{}, err
  319. }
  320. var res Model
  321. if err := json.Unmarshal(bs, &res); err != nil {
  322. return Model{}, err
  323. }
  324. l.Debugf("%+v", res)
  325. return res, nil
  326. }
  327. func (p *Process) readResponse(resp *http.Response) ([]byte, error) {
  328. bs, err := ioutil.ReadAll(resp.Body)
  329. resp.Body.Close()
  330. if err != nil {
  331. return bs, err
  332. }
  333. if resp.StatusCode != 200 {
  334. return bs, fmt.Errorf("%s", resp.Status)
  335. }
  336. return bs, nil
  337. }
  338. func (p *Process) checkForProblems(logfd *os.File) error {
  339. fd, err := os.Open(logfd.Name())
  340. if err != nil {
  341. return err
  342. }
  343. defer fd.Close()
  344. raceConditionStart := []byte("WARNING: DATA RACE")
  345. raceConditionSep := []byte("==================")
  346. panicConditionStart := []byte("panic:")
  347. panicConditionSep := []byte("[") // fallback if we don't already know our ID
  348. if p.id.String() != "" {
  349. panicConditionSep = []byte(p.id.String()[:5])
  350. }
  351. sc := bufio.NewScanner(fd)
  352. race := false
  353. _panic := false
  354. for sc.Scan() {
  355. line := sc.Bytes()
  356. if race || _panic {
  357. if bytes.Contains(line, panicConditionSep) {
  358. _panic = false
  359. continue
  360. }
  361. fmt.Printf("%s\n", line)
  362. if bytes.Contains(line, raceConditionSep) {
  363. race = false
  364. }
  365. } else if bytes.Contains(line, raceConditionStart) {
  366. fmt.Printf("%s\n", raceConditionSep)
  367. fmt.Printf("%s\n", raceConditionStart)
  368. race = true
  369. if err == nil {
  370. err = errors.New("Race condition detected")
  371. }
  372. } else if bytes.Contains(line, panicConditionStart) {
  373. _panic = true
  374. if err == nil {
  375. err = errors.New("Panic detected")
  376. }
  377. }
  378. }
  379. return err
  380. }
  381. func (p *Process) eventLoop() {
  382. since := 0
  383. notScanned := make(map[string]struct{})
  384. start := time.Now()
  385. for {
  386. select {
  387. case <-p.stopped:
  388. return
  389. default:
  390. }
  391. events, err := p.Events(since)
  392. if err != nil {
  393. if time.Since(start) < 5*time.Second {
  394. // The API has probably not started yet, lets give it some time.
  395. continue
  396. }
  397. // If we're stopping, no need to print the error.
  398. select {
  399. case <-p.stopped:
  400. return
  401. default:
  402. }
  403. log.Println("eventLoop: events:", err)
  404. continue
  405. }
  406. for _, ev := range events {
  407. if ev.ID != since+1 {
  408. l.Warnln("Event ID jumped", since, "to", ev.ID)
  409. }
  410. since = ev.ID
  411. switch ev.Type {
  412. case "Starting":
  413. // The Starting event tells us where the configuration is. Load
  414. // it and populate our list of folders.
  415. data := ev.Data.(map[string]interface{})
  416. id, err := protocol.DeviceIDFromString(data["myID"].(string))
  417. if err != nil {
  418. log.Println("eventLoop: DeviceIdFromString:", err)
  419. continue
  420. }
  421. p.id = id
  422. home := data["home"].(string)
  423. w, err := config.Load(filepath.Join(home, "config.xml"), protocol.LocalDeviceID)
  424. if err != nil {
  425. log.Println("eventLoop: Starting:", err)
  426. continue
  427. }
  428. for id := range w.Folders() {
  429. p.eventMut.Lock()
  430. p.folders = append(p.folders, id)
  431. p.eventMut.Unlock()
  432. notScanned[id] = struct{}{}
  433. }
  434. l.Debugln("Started", p.id)
  435. case "StateChanged":
  436. // When a folder changes to idle, we tick it off by removing
  437. // it from p.notScanned.
  438. if len(p.folders) == 0 {
  439. // We haven't parsed the config yet, shouldn't happen
  440. panic("race, or lost startup event")
  441. }
  442. select {
  443. case <-p.startComplete:
  444. default:
  445. data := ev.Data.(map[string]interface{})
  446. to := data["to"].(string)
  447. if to == "idle" {
  448. folder := data["folder"].(string)
  449. delete(notScanned, folder)
  450. if len(notScanned) == 0 {
  451. close(p.startComplete)
  452. }
  453. }
  454. }
  455. case "LocalIndexUpdated":
  456. data := ev.Data.(map[string]interface{})
  457. folder := data["folder"].(string)
  458. version, _ := data["version"].(json.Number).Int64()
  459. p.eventMut.Lock()
  460. m := p.sequence[folder]
  461. if m == nil {
  462. m = make(map[string]int64)
  463. }
  464. device := p.id.String()
  465. if device == "" {
  466. panic("race, or startup not complete")
  467. }
  468. m[device] = version
  469. p.sequence[folder] = m
  470. p.done[folder] = false
  471. l.Debugf("LocalIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
  472. p.eventMut.Unlock()
  473. case "RemoteIndexUpdated":
  474. data := ev.Data.(map[string]interface{})
  475. device := data["device"].(string)
  476. folder := data["folder"].(string)
  477. version, _ := data["version"].(json.Number).Int64()
  478. p.eventMut.Lock()
  479. m := p.sequence[folder]
  480. if m == nil {
  481. m = make(map[string]int64)
  482. }
  483. m[device] = version
  484. p.sequence[folder] = m
  485. p.done[folder] = false
  486. l.Debugf("RemoteIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
  487. p.eventMut.Unlock()
  488. case "FolderSummary":
  489. data := ev.Data.(map[string]interface{})
  490. folder := data["folder"].(string)
  491. summary := data["summary"].(map[string]interface{})
  492. need, _ := summary["needBytes"].(json.Number).Int64()
  493. done := need == 0
  494. p.eventMut.Lock()
  495. p.done[folder] = done
  496. l.Debugf("Foldersummary %v %v\n\t%+v", p.id, folder, p.done)
  497. p.eventMut.Unlock()
  498. }
  499. }
  500. }
  501. }
  502. type ConnectionStats struct {
  503. Address string
  504. Type string
  505. Connected bool
  506. Paused bool
  507. ClientVersion string
  508. InBytesTotal int64
  509. OutBytesTotal int64
  510. }
  511. func (p *Process) Connections() (map[string]ConnectionStats, error) {
  512. bs, err := p.Get("/rest/system/connections")
  513. if err != nil {
  514. return nil, err
  515. }
  516. var res map[string]ConnectionStats
  517. if err := json.Unmarshal(bs, &res); err != nil {
  518. return nil, err
  519. }
  520. return res, nil
  521. }
  522. type SystemStatus struct {
  523. Alloc int64
  524. CPUPercent float64
  525. Goroutines int
  526. MyID protocol.DeviceID
  527. PathSeparator string
  528. StartTime time.Time
  529. Sys int64
  530. Themes []string
  531. Tilde string
  532. Uptime int
  533. }
  534. func (p *Process) SystemStatus() (SystemStatus, error) {
  535. bs, err := p.Get("/rest/system/status")
  536. if err != nil {
  537. return SystemStatus{}, err
  538. }
  539. var res SystemStatus
  540. if err := json.Unmarshal(bs, &res); err != nil {
  541. return SystemStatus{}, err
  542. }
  543. return res, nil
  544. }
  545. type SystemVersion struct {
  546. Arch string
  547. Codename string
  548. LongVersion string
  549. OS string
  550. Version string
  551. }
  552. func (p *Process) SystemVersion() (SystemVersion, error) {
  553. bs, err := p.Get("/rest/system/version")
  554. if err != nil {
  555. return SystemVersion{}, err
  556. }
  557. var res SystemVersion
  558. if err := json.Unmarshal(bs, &res); err != nil {
  559. return SystemVersion{}, err
  560. }
  561. return res, nil
  562. }