rc.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. // Package rc provides remote control of a Syncthing process via the REST API.
  7. package rc
  8. import (
  9. "bufio"
  10. "bytes"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "io/ioutil"
  16. "log"
  17. "net/http"
  18. "net/url"
  19. "os"
  20. "os/exec"
  21. "path/filepath"
  22. "strconv"
  23. "time"
  24. "github.com/syncthing/syncthing/lib/config"
  25. "github.com/syncthing/syncthing/lib/dialer"
  26. "github.com/syncthing/syncthing/lib/protocol"
  27. "github.com/syncthing/syncthing/lib/sync"
  28. )
  29. // APIKey is set via the STGUIAPIKEY variable when we launch the binary, to
  30. // ensure that we have API access regardless of authentication settings.
  31. const APIKey = "592A47BC-A7DF-4C2F-89E0-A80B3E5094EE"
  32. type Process struct {
  33. // Set at initialization
  34. addr string
  35. // Set by eventLoop()
  36. eventMut sync.Mutex
  37. id protocol.DeviceID
  38. folders []string
  39. startComplete chan struct{}
  40. stopped chan struct{}
  41. stopErr error
  42. sequence map[string]map[string]int64 // Folder ID => Device ID => Sequence
  43. done map[string]bool // Folder ID => 100%
  44. cmd *exec.Cmd
  45. logfd *os.File
  46. }
  47. // NewProcess returns a new Process talking to Syncthing at the specified address.
  48. // Example: NewProcess("127.0.0.1:8082")
  49. func NewProcess(addr string) *Process {
  50. p := &Process{
  51. addr: addr,
  52. sequence: make(map[string]map[string]int64),
  53. done: make(map[string]bool),
  54. eventMut: sync.NewMutex(),
  55. startComplete: make(chan struct{}),
  56. stopped: make(chan struct{}),
  57. }
  58. return p
  59. }
  60. func (p *Process) ID() protocol.DeviceID {
  61. return p.id
  62. }
  63. // LogTo creates the specified log file and ensures that stdout and stderr
  64. // from the Start()ed process is redirected there. Must be called before
  65. // Start().
  66. func (p *Process) LogTo(filename string) error {
  67. if p.cmd != nil {
  68. panic("logfd cannot be set with an existing cmd")
  69. }
  70. if p.logfd != nil {
  71. p.logfd.Close()
  72. }
  73. fd, err := os.Create(filename)
  74. if err != nil {
  75. return err
  76. }
  77. p.logfd = fd
  78. return nil
  79. }
  80. // Start runs the specified Syncthing binary with the given arguments.
  81. // Syncthing should be configured to provide an API on the address given to
  82. // NewProcess. Event processing is started.
  83. func (p *Process) Start(bin string, args ...string) error {
  84. cmd := exec.Command(bin, args...)
  85. if p.logfd != nil {
  86. cmd.Stdout = p.logfd
  87. cmd.Stderr = p.logfd
  88. }
  89. cmd.Env = append(os.Environ(), "STNORESTART=1", "STGUIAPIKEY="+APIKey)
  90. err := cmd.Start()
  91. if err != nil {
  92. return err
  93. }
  94. p.cmd = cmd
  95. go p.eventLoop()
  96. go p.wait()
  97. return nil
  98. }
  99. func (p *Process) wait() {
  100. p.cmd.Wait()
  101. if p.logfd != nil {
  102. p.stopErr = p.checkForProblems(p.logfd)
  103. }
  104. close(p.stopped)
  105. }
  106. // AwaitStartup waits for the Syncthing process to start and perform initial
  107. // scans of all folders.
  108. func (p *Process) AwaitStartup() {
  109. fmt.Println("awaiting startup")
  110. select {
  111. case <-p.startComplete:
  112. case <-p.stopped:
  113. }
  114. fmt.Println("awaited startup")
  115. }
  116. // Stop stops the running Syncthing process. If the process was logging to a
  117. // local file (set by LogTo), the log file will be opened and checked for
  118. // panics and data races. The presence of either will be signalled in the form
  119. // of a returned error.
  120. func (p *Process) Stop() (*os.ProcessState, error) {
  121. select {
  122. case <-p.stopped:
  123. return p.cmd.ProcessState, p.stopErr
  124. default:
  125. }
  126. if _, err := p.Post("/rest/system/shutdown", nil); err != nil && err != io.ErrUnexpectedEOF {
  127. // Unexpected EOF is somewhat expected here, as we may exit before
  128. // returning something sensible.
  129. return nil, err
  130. }
  131. <-p.stopped
  132. return p.cmd.ProcessState, p.stopErr
  133. }
  134. // Get performs an HTTP GET and returns the bytes and/or an error. Any non-200
  135. // return code is returned as an error.
  136. func (p *Process) Get(path string) ([]byte, error) {
  137. client := &http.Client{
  138. Timeout: 30 * time.Second,
  139. Transport: &http.Transport{
  140. Dial: dialer.Dial,
  141. Proxy: http.ProxyFromEnvironment,
  142. DisableKeepAlives: true,
  143. },
  144. }
  145. url := fmt.Sprintf("http://%s%s", p.addr, path)
  146. req, err := http.NewRequest("GET", url, nil)
  147. if err != nil {
  148. return nil, err
  149. }
  150. req.Header.Add("X-API-Key", APIKey)
  151. resp, err := client.Do(req)
  152. if err != nil {
  153. return nil, err
  154. }
  155. return p.readResponse(resp)
  156. }
  157. // Post performs an HTTP POST and returns the bytes and/or an error. Any
  158. // non-200 return code is returned as an error.
  159. func (p *Process) Post(path string, data io.Reader) ([]byte, error) {
  160. client := &http.Client{
  161. Timeout: 600 * time.Second,
  162. Transport: &http.Transport{
  163. DisableKeepAlives: true,
  164. },
  165. }
  166. url := fmt.Sprintf("http://%s%s", p.addr, path)
  167. req, err := http.NewRequest("POST", url, data)
  168. if err != nil {
  169. return nil, err
  170. }
  171. req.Header.Add("X-API-Key", APIKey)
  172. req.Header.Add("Content-Type", "application/json")
  173. resp, err := client.Do(req)
  174. if err != nil {
  175. return nil, err
  176. }
  177. return p.readResponse(resp)
  178. }
  179. type Event struct {
  180. ID int
  181. Time time.Time
  182. Type string
  183. Data interface{}
  184. }
  185. func (p *Process) Events(since int) ([]Event, error) {
  186. bs, err := p.Get(fmt.Sprintf("/rest/events?since=%d&timeout=10", since))
  187. if err != nil {
  188. return nil, err
  189. }
  190. var evs []Event
  191. dec := json.NewDecoder(bytes.NewReader(bs))
  192. dec.UseNumber()
  193. err = dec.Decode(&evs)
  194. if err != nil {
  195. return nil, fmt.Errorf("Events: %s in %q", err, bs)
  196. }
  197. return evs, err
  198. }
  199. func (p *Process) Rescan(folder string) error {
  200. _, err := p.Post("/rest/db/scan?folder="+url.QueryEscape(folder), nil)
  201. return err
  202. }
  203. func (p *Process) RescanDelay(folder string, delaySeconds int) error {
  204. _, err := p.Post(fmt.Sprintf("/rest/db/scan?folder=%s&next=%d", url.QueryEscape(folder), delaySeconds), nil)
  205. return err
  206. }
  207. func (p *Process) RescanSub(folder string, sub string, delaySeconds int) error {
  208. return p.RescanSubs(folder, []string{sub}, delaySeconds)
  209. }
  210. func (p *Process) RescanSubs(folder string, subs []string, delaySeconds int) error {
  211. data := url.Values{}
  212. data.Set("folder", folder)
  213. for _, sub := range subs {
  214. data.Add("sub", sub)
  215. }
  216. data.Set("next", strconv.Itoa(delaySeconds))
  217. _, err := p.Post("/rest/db/scan?"+data.Encode(), nil)
  218. return err
  219. }
  220. func (p *Process) ConfigInSync() (bool, error) {
  221. bs, err := p.Get("/rest/system/config/insync")
  222. if err != nil {
  223. return false, err
  224. }
  225. return bytes.Contains(bs, []byte("true")), nil
  226. }
  227. func (p *Process) GetConfig() (config.Configuration, error) {
  228. var cfg config.Configuration
  229. bs, err := p.Get("/rest/system/config")
  230. if err != nil {
  231. return cfg, err
  232. }
  233. err = json.Unmarshal(bs, &cfg)
  234. return cfg, err
  235. }
  236. func (p *Process) PostConfig(cfg config.Configuration) error {
  237. buf := new(bytes.Buffer)
  238. if err := json.NewEncoder(buf).Encode(cfg); err != nil {
  239. return err
  240. }
  241. _, err := p.Post("/rest/system/config", buf)
  242. return err
  243. }
  244. func (p *Process) PauseDevice(dev protocol.DeviceID) error {
  245. _, err := p.Post("/rest/system/pause?device="+dev.String(), nil)
  246. return err
  247. }
  248. func (p *Process) ResumeDevice(dev protocol.DeviceID) error {
  249. _, err := p.Post("/rest/system/resume?device="+dev.String(), nil)
  250. return err
  251. }
  252. func (p *Process) PauseAll() error {
  253. _, err := p.Post("/rest/system/pause", nil)
  254. return err
  255. }
  256. func (p *Process) ResumeAll() error {
  257. _, err := p.Post("/rest/system/resume", nil)
  258. return err
  259. }
  260. func InSync(folder string, ps ...*Process) bool {
  261. for _, p := range ps {
  262. p.eventMut.Lock()
  263. }
  264. defer func() {
  265. for _, p := range ps {
  266. p.eventMut.Unlock()
  267. }
  268. }()
  269. for i := range ps {
  270. // If our latest FolderSummary didn't report 100%, then we are not done.
  271. if !ps[i].done[folder] {
  272. return false
  273. }
  274. // Check Sequence for each device. The local version seen by remote
  275. // devices should be the same as what it has locally, or the index
  276. // hasn't been sent yet.
  277. sourceID := ps[i].id.String()
  278. sourceSeq := ps[i].sequence[folder][sourceID]
  279. l.Debugf("sourceSeq = ps[%d].sequence[%q][%q] = %d", i, folder, sourceID, sourceSeq)
  280. for j := range ps {
  281. if i != j {
  282. remoteSeq := ps[j].sequence[folder][sourceID]
  283. if remoteSeq != sourceSeq {
  284. l.Debugf("remoteSeq = ps[%d].sequence[%q][%q] = %d", j, folder, sourceID, remoteSeq)
  285. return false
  286. }
  287. }
  288. }
  289. }
  290. return true
  291. }
  292. func AwaitSync(folder string, ps ...*Process) {
  293. for {
  294. time.Sleep(250 * time.Millisecond)
  295. if InSync(folder, ps...) {
  296. return
  297. }
  298. }
  299. }
  300. type Model struct {
  301. GlobalBytes int
  302. GlobalDeleted int
  303. GlobalFiles int
  304. InSyncBytes int
  305. InSyncFiles int
  306. Invalid string
  307. LocalBytes int
  308. LocalDeleted int
  309. LocalFiles int
  310. NeedBytes int
  311. NeedFiles int
  312. State string
  313. StateChanged time.Time
  314. Version int
  315. }
  316. func (p *Process) Model(folder string) (Model, error) {
  317. bs, err := p.Get("/rest/db/status?folder=" + url.QueryEscape(folder))
  318. if err != nil {
  319. return Model{}, err
  320. }
  321. var res Model
  322. if err := json.Unmarshal(bs, &res); err != nil {
  323. return Model{}, err
  324. }
  325. l.Debugf("%+v", res)
  326. return res, nil
  327. }
  328. func (p *Process) readResponse(resp *http.Response) ([]byte, error) {
  329. bs, err := ioutil.ReadAll(resp.Body)
  330. resp.Body.Close()
  331. if err != nil {
  332. return bs, err
  333. }
  334. if resp.StatusCode != 200 {
  335. return bs, fmt.Errorf("%s", resp.Status)
  336. }
  337. return bs, nil
  338. }
  339. func (p *Process) checkForProblems(logfd *os.File) error {
  340. fd, err := os.Open(logfd.Name())
  341. if err != nil {
  342. return err
  343. }
  344. defer fd.Close()
  345. raceConditionStart := []byte("WARNING: DATA RACE")
  346. raceConditionSep := []byte("==================")
  347. panicConditionStart := []byte("panic:")
  348. p.eventMut.Lock()
  349. panicConditionSep := []byte("[") // fallback if we don't already know our ID
  350. if p.id.String() != "" {
  351. panicConditionSep = []byte(p.id.String()[:5])
  352. }
  353. sc := bufio.NewScanner(fd)
  354. race := false
  355. _panic := false
  356. for sc.Scan() {
  357. line := sc.Bytes()
  358. if race || _panic {
  359. if bytes.Contains(line, panicConditionSep) {
  360. _panic = false
  361. continue
  362. }
  363. fmt.Printf("%s\n", line)
  364. if bytes.Contains(line, raceConditionSep) {
  365. race = false
  366. }
  367. } else if bytes.Contains(line, raceConditionStart) {
  368. fmt.Printf("%s\n", raceConditionSep)
  369. fmt.Printf("%s\n", raceConditionStart)
  370. race = true
  371. if err == nil {
  372. err = errors.New("Race condition detected")
  373. }
  374. } else if bytes.Contains(line, panicConditionStart) {
  375. _panic = true
  376. if err == nil {
  377. err = errors.New("Panic detected")
  378. }
  379. }
  380. }
  381. return err
  382. }
  383. func (p *Process) eventLoop() {
  384. since := 0
  385. notScanned := make(map[string]struct{})
  386. start := time.Now()
  387. for {
  388. select {
  389. case <-p.stopped:
  390. return
  391. default:
  392. }
  393. events, err := p.Events(since)
  394. if err != nil {
  395. if time.Since(start) < 5*time.Second {
  396. // The API has probably not started yet, lets give it some time.
  397. continue
  398. }
  399. // If we're stopping, no need to print the error.
  400. select {
  401. case <-p.stopped:
  402. return
  403. default:
  404. }
  405. log.Println("eventLoop: events:", err)
  406. continue
  407. }
  408. for _, ev := range events {
  409. if ev.ID != since+1 {
  410. l.Warnln("Event ID jumped", since, "to", ev.ID)
  411. }
  412. since = ev.ID
  413. switch ev.Type {
  414. case "Starting":
  415. // The Starting event tells us where the configuration is. Load
  416. // it and populate our list of folders.
  417. data := ev.Data.(map[string]interface{})
  418. id, err := protocol.DeviceIDFromString(data["myID"].(string))
  419. if err != nil {
  420. log.Println("eventLoop: DeviceIdFromString:", err)
  421. continue
  422. }
  423. p.id = id
  424. home := data["home"].(string)
  425. w, err := config.Load(filepath.Join(home, "config.xml"), protocol.LocalDeviceID)
  426. if err != nil {
  427. log.Println("eventLoop: Starting:", err)
  428. continue
  429. }
  430. for id := range w.Folders() {
  431. p.eventMut.Lock()
  432. p.folders = append(p.folders, id)
  433. p.eventMut.Unlock()
  434. notScanned[id] = struct{}{}
  435. }
  436. l.Debugln("Started", p.id)
  437. case "StateChanged":
  438. // When a folder changes to idle, we tick it off by removing
  439. // it from p.notScanned.
  440. if len(p.folders) == 0 {
  441. // We haven't parsed the config yet, shouldn't happen
  442. panic("race, or lost startup event")
  443. }
  444. select {
  445. case <-p.startComplete:
  446. default:
  447. data := ev.Data.(map[string]interface{})
  448. to := data["to"].(string)
  449. if to == "idle" {
  450. folder := data["folder"].(string)
  451. delete(notScanned, folder)
  452. if len(notScanned) == 0 {
  453. close(p.startComplete)
  454. }
  455. }
  456. }
  457. case "LocalIndexUpdated":
  458. data := ev.Data.(map[string]interface{})
  459. folder := data["folder"].(string)
  460. version, _ := data["version"].(json.Number).Int64()
  461. p.eventMut.Lock()
  462. m := p.sequence[folder]
  463. if m == nil {
  464. m = make(map[string]int64)
  465. }
  466. device := p.id.String()
  467. if device == "" {
  468. panic("race, or startup not complete")
  469. }
  470. m[device] = version
  471. p.sequence[folder] = m
  472. p.done[folder] = false
  473. l.Debugf("LocalIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
  474. p.eventMut.Unlock()
  475. case "RemoteIndexUpdated":
  476. data := ev.Data.(map[string]interface{})
  477. device := data["device"].(string)
  478. folder := data["folder"].(string)
  479. version, _ := data["version"].(json.Number).Int64()
  480. p.eventMut.Lock()
  481. m := p.sequence[folder]
  482. if m == nil {
  483. m = make(map[string]int64)
  484. }
  485. m[device] = version
  486. p.sequence[folder] = m
  487. p.done[folder] = false
  488. l.Debugf("RemoteIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
  489. p.eventMut.Unlock()
  490. case "FolderSummary":
  491. data := ev.Data.(map[string]interface{})
  492. folder := data["folder"].(string)
  493. summary := data["summary"].(map[string]interface{})
  494. need, _ := summary["needBytes"].(json.Number).Int64()
  495. done := need == 0
  496. p.eventMut.Lock()
  497. p.done[folder] = done
  498. l.Debugf("Foldersummary %v %v\n\t%+v", p.id, folder, p.done)
  499. p.eventMut.Unlock()
  500. }
  501. }
  502. }
  503. }
  504. type ConnectionStats struct {
  505. Address string
  506. Type string
  507. Connected bool
  508. Paused bool
  509. ClientVersion string
  510. InBytesTotal int64
  511. OutBytesTotal int64
  512. }
  513. func (p *Process) Connections() (map[string]ConnectionStats, error) {
  514. bs, err := p.Get("/rest/system/connections")
  515. if err != nil {
  516. return nil, err
  517. }
  518. var res map[string]ConnectionStats
  519. if err := json.Unmarshal(bs, &res); err != nil {
  520. return nil, err
  521. }
  522. return res, nil
  523. }
  524. type SystemStatus struct {
  525. Alloc int64
  526. CPUPercent float64
  527. Goroutines int
  528. MyID protocol.DeviceID
  529. PathSeparator string
  530. StartTime time.Time
  531. Sys int64
  532. Themes []string
  533. Tilde string
  534. Uptime int
  535. }
  536. func (p *Process) SystemStatus() (SystemStatus, error) {
  537. bs, err := p.Get("/rest/system/status")
  538. if err != nil {
  539. return SystemStatus{}, err
  540. }
  541. var res SystemStatus
  542. if err := json.Unmarshal(bs, &res); err != nil {
  543. return SystemStatus{}, err
  544. }
  545. return res, nil
  546. }
  547. type SystemVersion struct {
  548. Arch string
  549. Codename string
  550. LongVersion string
  551. OS string
  552. Version string
  553. }
  554. func (p *Process) SystemVersion() (SystemVersion, error) {
  555. bs, err := p.Get("/rest/system/version")
  556. if err != nil {
  557. return SystemVersion{}, err
  558. }
  559. var res SystemVersion
  560. if err := json.Unmarshal(bs, &res); err != nil {
  561. return SystemVersion{}, err
  562. }
  563. return res, nil
  564. }