rc.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. // Package rc provides remote control of a Syncthing process via the REST API.
  7. package rc
  8. import (
  9. "bufio"
  10. "bytes"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "io/ioutil"
  16. "log"
  17. "net/http"
  18. "net/url"
  19. "os"
  20. "os/exec"
  21. "path/filepath"
  22. "strconv"
  23. "time"
  24. "github.com/syncthing/syncthing/lib/config"
  25. "github.com/syncthing/syncthing/lib/dialer"
  26. "github.com/syncthing/syncthing/lib/events"
  27. "github.com/syncthing/syncthing/lib/protocol"
  28. "github.com/syncthing/syncthing/lib/sync"
  29. )
  30. // APIKey is set via the STGUIAPIKEY variable when we launch the binary, to
  31. // ensure that we have API access regardless of authentication settings.
  32. const APIKey = "592A47BC-A7DF-4C2F-89E0-A80B3E5094EE"
  33. type Process struct {
  34. // Set at initialization
  35. addr string
  36. // Set by eventLoop()
  37. eventMut sync.Mutex
  38. id protocol.DeviceID
  39. folders []string
  40. startComplete chan struct{}
  41. stopped chan struct{}
  42. stopErr error
  43. sequence map[string]map[string]int64 // Folder ID => Device ID => Sequence
  44. done map[string]bool // Folder ID => 100%
  45. cmd *exec.Cmd
  46. logfd *os.File
  47. }
  48. // NewProcess returns a new Process talking to Syncthing at the specified address.
  49. // Example: NewProcess("127.0.0.1:8082")
  50. func NewProcess(addr string) *Process {
  51. p := &Process{
  52. addr: addr,
  53. sequence: make(map[string]map[string]int64),
  54. done: make(map[string]bool),
  55. eventMut: sync.NewMutex(),
  56. startComplete: make(chan struct{}),
  57. stopped: make(chan struct{}),
  58. }
  59. return p
  60. }
  61. func (p *Process) ID() protocol.DeviceID {
  62. return p.id
  63. }
  64. // LogTo creates the specified log file and ensures that stdout and stderr
  65. // from the Start()ed process is redirected there. Must be called before
  66. // Start().
  67. func (p *Process) LogTo(filename string) error {
  68. if p.cmd != nil {
  69. panic("logfd cannot be set with an existing cmd")
  70. }
  71. if p.logfd != nil {
  72. p.logfd.Close()
  73. }
  74. fd, err := os.Create(filename)
  75. if err != nil {
  76. return err
  77. }
  78. p.logfd = fd
  79. return nil
  80. }
  81. // Start runs the specified Syncthing binary with the given arguments.
  82. // Syncthing should be configured to provide an API on the address given to
  83. // NewProcess. Event processing is started.
  84. func (p *Process) Start(bin string, args ...string) error {
  85. cmd := exec.Command(bin, args...)
  86. if p.logfd != nil {
  87. cmd.Stdout = p.logfd
  88. cmd.Stderr = p.logfd
  89. }
  90. cmd.Env = append(os.Environ(), "STNORESTART=1", "STGUIAPIKEY="+APIKey)
  91. err := cmd.Start()
  92. if err != nil {
  93. return err
  94. }
  95. p.cmd = cmd
  96. go p.eventLoop()
  97. go p.wait()
  98. return nil
  99. }
  100. func (p *Process) wait() {
  101. p.cmd.Wait()
  102. if p.logfd != nil {
  103. p.stopErr = p.checkForProblems(p.logfd)
  104. }
  105. close(p.stopped)
  106. }
  107. // AwaitStartup waits for the Syncthing process to start and perform initial
  108. // scans of all folders.
  109. func (p *Process) AwaitStartup() {
  110. select {
  111. case <-p.startComplete:
  112. case <-p.stopped:
  113. }
  114. }
  115. // Stop stops the running Syncthing process. If the process was logging to a
  116. // local file (set by LogTo), the log file will be opened and checked for
  117. // panics and data races. The presence of either will be signalled in the form
  118. // of a returned error.
  119. func (p *Process) Stop() (*os.ProcessState, error) {
  120. select {
  121. case <-p.stopped:
  122. return p.cmd.ProcessState, p.stopErr
  123. default:
  124. }
  125. if _, err := p.Post("/rest/system/shutdown", nil); err != nil && err != io.ErrUnexpectedEOF {
  126. // Unexpected EOF is somewhat expected here, as we may exit before
  127. // returning something sensible.
  128. return nil, err
  129. }
  130. <-p.stopped
  131. return p.cmd.ProcessState, p.stopErr
  132. }
  133. // Get performs an HTTP GET and returns the bytes and/or an error. Any non-200
  134. // return code is returned as an error.
  135. func (p *Process) Get(path string) ([]byte, error) {
  136. client := &http.Client{
  137. Timeout: 30 * time.Second,
  138. Transport: &http.Transport{
  139. Dial: dialer.Dial,
  140. Proxy: http.ProxyFromEnvironment,
  141. DisableKeepAlives: true,
  142. },
  143. }
  144. url := fmt.Sprintf("http://%s%s", p.addr, path)
  145. req, err := http.NewRequest("GET", url, nil)
  146. if err != nil {
  147. return nil, err
  148. }
  149. req.Header.Add("X-API-Key", APIKey)
  150. resp, err := client.Do(req)
  151. if err != nil {
  152. return nil, err
  153. }
  154. return p.readResponse(resp)
  155. }
  156. // Post performs an HTTP POST and returns the bytes and/or an error. Any
  157. // non-200 return code is returned as an error.
  158. func (p *Process) Post(path string, data io.Reader) ([]byte, error) {
  159. client := &http.Client{
  160. Timeout: 600 * time.Second,
  161. Transport: &http.Transport{
  162. DisableKeepAlives: true,
  163. },
  164. }
  165. url := fmt.Sprintf("http://%s%s", p.addr, path)
  166. req, err := http.NewRequest("POST", url, data)
  167. if err != nil {
  168. return nil, err
  169. }
  170. req.Header.Add("X-API-Key", APIKey)
  171. req.Header.Add("Content-Type", "application/json")
  172. resp, err := client.Do(req)
  173. if err != nil {
  174. return nil, err
  175. }
  176. return p.readResponse(resp)
  177. }
  178. type Event struct {
  179. ID int
  180. Time time.Time
  181. Type string
  182. Data interface{}
  183. }
  184. func (p *Process) Events(since int) ([]Event, error) {
  185. bs, err := p.Get(fmt.Sprintf("/rest/events?since=%d&timeout=10", since))
  186. if err != nil {
  187. return nil, err
  188. }
  189. var evs []Event
  190. dec := json.NewDecoder(bytes.NewReader(bs))
  191. dec.UseNumber()
  192. err = dec.Decode(&evs)
  193. if err != nil {
  194. return nil, fmt.Errorf("Events: %s in %q", err, bs)
  195. }
  196. return evs, err
  197. }
  198. func (p *Process) Rescan(folder string) error {
  199. _, err := p.Post("/rest/db/scan?folder="+url.QueryEscape(folder), nil)
  200. return err
  201. }
  202. func (p *Process) RescanDelay(folder string, delaySeconds int) error {
  203. _, err := p.Post(fmt.Sprintf("/rest/db/scan?folder=%s&next=%d", url.QueryEscape(folder), delaySeconds), nil)
  204. return err
  205. }
  206. func (p *Process) RescanSub(folder string, sub string, delaySeconds int) error {
  207. return p.RescanSubs(folder, []string{sub}, delaySeconds)
  208. }
  209. func (p *Process) RescanSubs(folder string, subs []string, delaySeconds int) error {
  210. data := url.Values{}
  211. data.Set("folder", folder)
  212. for _, sub := range subs {
  213. data.Add("sub", sub)
  214. }
  215. data.Set("next", strconv.Itoa(delaySeconds))
  216. _, err := p.Post("/rest/db/scan?"+data.Encode(), nil)
  217. return err
  218. }
  219. func (p *Process) ConfigInSync() (bool, error) {
  220. bs, err := p.Get("/rest/system/config/insync")
  221. if err != nil {
  222. return false, err
  223. }
  224. return bytes.Contains(bs, []byte("true")), nil
  225. }
  226. func (p *Process) GetConfig() (config.Configuration, error) {
  227. var cfg config.Configuration
  228. bs, err := p.Get("/rest/system/config")
  229. if err != nil {
  230. return cfg, err
  231. }
  232. err = json.Unmarshal(bs, &cfg)
  233. return cfg, err
  234. }
  235. func (p *Process) PostConfig(cfg config.Configuration) error {
  236. buf := new(bytes.Buffer)
  237. if err := json.NewEncoder(buf).Encode(cfg); err != nil {
  238. return err
  239. }
  240. _, err := p.Post("/rest/system/config", buf)
  241. return err
  242. }
  243. func (p *Process) PauseDevice(dev protocol.DeviceID) error {
  244. _, err := p.Post("/rest/system/pause?device="+dev.String(), nil)
  245. return err
  246. }
  247. func (p *Process) ResumeDevice(dev protocol.DeviceID) error {
  248. _, err := p.Post("/rest/system/resume?device="+dev.String(), nil)
  249. return err
  250. }
  251. func (p *Process) PauseAll() error {
  252. _, err := p.Post("/rest/system/pause", nil)
  253. return err
  254. }
  255. func (p *Process) ResumeAll() error {
  256. _, err := p.Post("/rest/system/resume", nil)
  257. return err
  258. }
  259. func InSync(folder string, ps ...*Process) bool {
  260. for _, p := range ps {
  261. p.eventMut.Lock()
  262. }
  263. defer func() {
  264. for _, p := range ps {
  265. p.eventMut.Unlock()
  266. }
  267. }()
  268. for i := range ps {
  269. // If our latest FolderSummary didn't report 100%, then we are not done.
  270. if !ps[i].done[folder] {
  271. l.Debugf("done = ps[%d].done[%q] = false", i, folder)
  272. return false
  273. }
  274. // Check Sequence for each device. The local version seen by remote
  275. // devices should be the same as what it has locally, or the index
  276. // hasn't been sent yet.
  277. sourceID := ps[i].id.String()
  278. sourceSeq := ps[i].sequence[folder][sourceID]
  279. l.Debugf("sourceSeq = ps[%d].sequence[%q][%q] = %d", i, folder, sourceID, sourceSeq)
  280. for j := range ps {
  281. if i != j {
  282. remoteSeq := ps[j].sequence[folder][sourceID]
  283. if remoteSeq != sourceSeq {
  284. l.Debugf("remoteSeq = ps[%d].sequence[%q][%q] = %d", j, folder, sourceID, remoteSeq)
  285. return false
  286. }
  287. }
  288. }
  289. }
  290. return true
  291. }
  292. func AwaitSync(folder string, ps ...*Process) {
  293. for {
  294. time.Sleep(250 * time.Millisecond)
  295. if InSync(folder, ps...) {
  296. return
  297. }
  298. }
  299. }
  300. type Model struct {
  301. GlobalBytes int
  302. GlobalDeleted int
  303. GlobalFiles int
  304. InSyncBytes int
  305. InSyncFiles int
  306. Invalid string
  307. LocalBytes int
  308. LocalDeleted int
  309. LocalFiles int
  310. NeedBytes int
  311. NeedFiles int
  312. State string
  313. StateChanged time.Time
  314. Version int
  315. }
  316. func (p *Process) Model(folder string) (Model, error) {
  317. bs, err := p.Get("/rest/db/status?folder=" + url.QueryEscape(folder))
  318. if err != nil {
  319. return Model{}, err
  320. }
  321. var res Model
  322. if err := json.Unmarshal(bs, &res); err != nil {
  323. return Model{}, err
  324. }
  325. l.Debugf("%+v", res)
  326. return res, nil
  327. }
  328. func (p *Process) readResponse(resp *http.Response) ([]byte, error) {
  329. bs, err := ioutil.ReadAll(resp.Body)
  330. resp.Body.Close()
  331. if err != nil {
  332. return bs, err
  333. }
  334. if resp.StatusCode != 200 {
  335. return bs, fmt.Errorf("%s", resp.Status)
  336. }
  337. return bs, nil
  338. }
  339. func (p *Process) checkForProblems(logfd *os.File) error {
  340. fd, err := os.Open(logfd.Name())
  341. if err != nil {
  342. return err
  343. }
  344. defer fd.Close()
  345. raceConditionStart := []byte("WARNING: DATA RACE")
  346. raceConditionSep := []byte("==================")
  347. panicConditionStart := []byte("panic:")
  348. panicConditionSep := []byte("[") // fallback if we don't already know our ID
  349. if p.id.String() != "" {
  350. panicConditionSep = []byte(p.id.String()[:5])
  351. }
  352. sc := bufio.NewScanner(fd)
  353. race := false
  354. _panic := false
  355. for sc.Scan() {
  356. line := sc.Bytes()
  357. if race || _panic {
  358. if bytes.Contains(line, panicConditionSep) {
  359. _panic = false
  360. continue
  361. }
  362. fmt.Printf("%s\n", line)
  363. if bytes.Contains(line, raceConditionSep) {
  364. race = false
  365. }
  366. } else if bytes.Contains(line, raceConditionStart) {
  367. fmt.Printf("%s\n", raceConditionSep)
  368. fmt.Printf("%s\n", raceConditionStart)
  369. race = true
  370. if err == nil {
  371. err = errors.New("Race condition detected")
  372. }
  373. } else if bytes.Contains(line, panicConditionStart) {
  374. _panic = true
  375. if err == nil {
  376. err = errors.New("Panic detected")
  377. }
  378. }
  379. }
  380. return err
  381. }
  382. func (p *Process) eventLoop() {
  383. since := 0
  384. notScanned := make(map[string]struct{})
  385. start := time.Now()
  386. for {
  387. select {
  388. case <-p.stopped:
  389. return
  390. default:
  391. }
  392. evs, err := p.Events(since)
  393. if err != nil {
  394. if time.Since(start) < 5*time.Second {
  395. // The API has probably not started yet, lets give it some time.
  396. continue
  397. }
  398. // If we're stopping, no need to print the error.
  399. select {
  400. case <-p.stopped:
  401. return
  402. default:
  403. }
  404. log.Println("eventLoop: events:", err)
  405. continue
  406. }
  407. for _, ev := range evs {
  408. if ev.ID != since+1 {
  409. l.Warnln("Event ID jumped", since, "to", ev.ID)
  410. }
  411. since = ev.ID
  412. switch ev.Type {
  413. case "Starting":
  414. // The Starting event tells us where the configuration is. Load
  415. // it and populate our list of folders.
  416. data := ev.Data.(map[string]interface{})
  417. id, err := protocol.DeviceIDFromString(data["myID"].(string))
  418. if err != nil {
  419. log.Println("eventLoop: DeviceIdFromString:", err)
  420. continue
  421. }
  422. p.id = id
  423. home := data["home"].(string)
  424. w, err := config.Load(filepath.Join(home, "config.xml"), protocol.LocalDeviceID, events.NoopLogger)
  425. if err != nil {
  426. log.Println("eventLoop: Starting:", err)
  427. continue
  428. }
  429. for id := range w.Folders() {
  430. p.eventMut.Lock()
  431. p.folders = append(p.folders, id)
  432. p.eventMut.Unlock()
  433. notScanned[id] = struct{}{}
  434. }
  435. l.Debugln("Started", p.id)
  436. case "StateChanged":
  437. // When a folder changes to idle, we tick it off by removing
  438. // it from p.notScanned.
  439. if len(p.folders) == 0 {
  440. // We haven't parsed the config yet, shouldn't happen
  441. panic("race, or lost startup event")
  442. }
  443. select {
  444. case <-p.startComplete:
  445. default:
  446. data := ev.Data.(map[string]interface{})
  447. to := data["to"].(string)
  448. if to == "idle" {
  449. folder := data["folder"].(string)
  450. delete(notScanned, folder)
  451. if len(notScanned) == 0 {
  452. close(p.startComplete)
  453. }
  454. }
  455. }
  456. case "LocalIndexUpdated":
  457. data := ev.Data.(map[string]interface{})
  458. folder := data["folder"].(string)
  459. version, _ := data["version"].(json.Number).Int64()
  460. p.eventMut.Lock()
  461. m := p.sequence[folder]
  462. if m == nil {
  463. m = make(map[string]int64)
  464. }
  465. device := p.id.String()
  466. if device == "" {
  467. panic("race, or startup not complete")
  468. }
  469. m[device] = version
  470. p.sequence[folder] = m
  471. p.done[folder] = false
  472. l.Debugf("LocalIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
  473. p.eventMut.Unlock()
  474. case "RemoteIndexUpdated":
  475. data := ev.Data.(map[string]interface{})
  476. device := data["device"].(string)
  477. folder := data["folder"].(string)
  478. version, _ := data["version"].(json.Number).Int64()
  479. p.eventMut.Lock()
  480. m := p.sequence[folder]
  481. if m == nil {
  482. m = make(map[string]int64)
  483. }
  484. m[device] = version
  485. p.sequence[folder] = m
  486. p.done[folder] = false
  487. l.Debugf("RemoteIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
  488. p.eventMut.Unlock()
  489. case "FolderSummary":
  490. data := ev.Data.(map[string]interface{})
  491. folder := data["folder"].(string)
  492. summary := data["summary"].(map[string]interface{})
  493. need, _ := summary["needBytes"].(json.Number).Int64()
  494. done := need == 0
  495. p.eventMut.Lock()
  496. p.done[folder] = done
  497. l.Debugf("Foldersummary %v %v\n\t%+v", p.id, folder, p.done)
  498. p.eventMut.Unlock()
  499. }
  500. }
  501. }
  502. }
  503. type ConnectionStats struct {
  504. Address string
  505. Type string
  506. Connected bool
  507. Paused bool
  508. ClientVersion string
  509. InBytesTotal int64
  510. OutBytesTotal int64
  511. }
  512. func (p *Process) Connections() (map[string]ConnectionStats, error) {
  513. bs, err := p.Get("/rest/system/connections")
  514. if err != nil {
  515. return nil, err
  516. }
  517. var res map[string]ConnectionStats
  518. if err := json.Unmarshal(bs, &res); err != nil {
  519. return nil, err
  520. }
  521. return res, nil
  522. }
  523. type SystemStatus struct {
  524. Alloc int64
  525. CPUPercent float64
  526. Goroutines int
  527. MyID protocol.DeviceID
  528. PathSeparator string
  529. StartTime time.Time
  530. Sys int64
  531. Themes []string
  532. Tilde string
  533. Uptime int
  534. }
  535. func (p *Process) SystemStatus() (SystemStatus, error) {
  536. bs, err := p.Get("/rest/system/status")
  537. if err != nil {
  538. return SystemStatus{}, err
  539. }
  540. var res SystemStatus
  541. if err := json.Unmarshal(bs, &res); err != nil {
  542. return SystemStatus{}, err
  543. }
  544. return res, nil
  545. }
  546. type SystemVersion struct {
  547. Arch string
  548. Codename string
  549. LongVersion string
  550. OS string
  551. Version string
  552. }
  553. func (p *Process) SystemVersion() (SystemVersion, error) {
  554. bs, err := p.Get("/rest/system/version")
  555. if err != nil {
  556. return SystemVersion{}, err
  557. }
  558. var res SystemVersion
  559. if err := json.Unmarshal(bs, &res); err != nil {
  560. return SystemVersion{}, err
  561. }
  562. return res, nil
  563. }