rc.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. // Package rc provides remote control of a Syncthing process via the REST API.
  7. package rc
  8. import (
  9. "bufio"
  10. "bytes"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "io/ioutil"
  16. "log"
  17. "net/http"
  18. "net/url"
  19. "os"
  20. "os/exec"
  21. "path/filepath"
  22. "strconv"
  23. "time"
  24. "github.com/syncthing/syncthing/lib/config"
  25. "github.com/syncthing/syncthing/lib/dialer"
  26. "github.com/syncthing/syncthing/lib/events"
  27. "github.com/syncthing/syncthing/lib/model"
  28. "github.com/syncthing/syncthing/lib/protocol"
  29. "github.com/syncthing/syncthing/lib/sync"
  30. )
  31. // APIKey is set via the STGUIAPIKEY variable when we launch the binary, to
  32. // ensure that we have API access regardless of authentication settings.
  33. const APIKey = "592A47BC-A7DF-4C2F-89E0-A80B3E5094EE"
  34. type Process struct {
  35. // Set at initialization
  36. addr string
  37. // Set by eventLoop()
  38. eventMut sync.Mutex
  39. id protocol.DeviceID
  40. folders []string
  41. startComplete chan struct{}
  42. stopped chan struct{}
  43. stopErr error
  44. sequence map[string]map[string]int64 // Folder ID => Device ID => Sequence
  45. done map[string]bool // Folder ID => 100%
  46. cmd *exec.Cmd
  47. logfd *os.File
  48. }
  49. // NewProcess returns a new Process talking to Syncthing at the specified address.
  50. // Example: NewProcess("127.0.0.1:8082")
  51. func NewProcess(addr string) *Process {
  52. p := &Process{
  53. addr: addr,
  54. sequence: make(map[string]map[string]int64),
  55. done: make(map[string]bool),
  56. eventMut: sync.NewMutex(),
  57. startComplete: make(chan struct{}),
  58. stopped: make(chan struct{}),
  59. }
  60. return p
  61. }
  62. func (p *Process) ID() protocol.DeviceID {
  63. return p.id
  64. }
  65. // LogTo creates the specified log file and ensures that stdout and stderr
  66. // from the Start()ed process is redirected there. Must be called before
  67. // Start().
  68. func (p *Process) LogTo(filename string) error {
  69. if p.cmd != nil {
  70. panic("logfd cannot be set with an existing cmd")
  71. }
  72. if p.logfd != nil {
  73. p.logfd.Close()
  74. }
  75. fd, err := os.Create(filename)
  76. if err != nil {
  77. return err
  78. }
  79. p.logfd = fd
  80. return nil
  81. }
  82. // Start runs the specified Syncthing binary with the given arguments.
  83. // Syncthing should be configured to provide an API on the address given to
  84. // NewProcess. Event processing is started.
  85. func (p *Process) Start(bin string, args ...string) error {
  86. cmd := exec.Command(bin, args...)
  87. if p.logfd != nil {
  88. cmd.Stdout = p.logfd
  89. cmd.Stderr = p.logfd
  90. }
  91. cmd.Env = append(os.Environ(), "STNORESTART=1", "STGUIAPIKEY="+APIKey)
  92. err := cmd.Start()
  93. if err != nil {
  94. return err
  95. }
  96. p.cmd = cmd
  97. go p.eventLoop()
  98. go p.wait()
  99. return nil
  100. }
  101. func (p *Process) wait() {
  102. p.cmd.Wait()
  103. if p.logfd != nil {
  104. p.stopErr = p.checkForProblems(p.logfd)
  105. }
  106. close(p.stopped)
  107. }
  108. // AwaitStartup waits for the Syncthing process to start and perform initial
  109. // scans of all folders.
  110. func (p *Process) AwaitStartup() {
  111. select {
  112. case <-p.startComplete:
  113. case <-p.stopped:
  114. }
  115. }
  116. // Stop stops the running Syncthing process. If the process was logging to a
  117. // local file (set by LogTo), the log file will be opened and checked for
  118. // panics and data races. The presence of either will be signalled in the form
  119. // of a returned error.
  120. func (p *Process) Stop() (*os.ProcessState, error) {
  121. select {
  122. case <-p.stopped:
  123. return p.cmd.ProcessState, p.stopErr
  124. default:
  125. }
  126. if _, err := p.Post("/rest/system/shutdown", nil); err != nil && err != io.ErrUnexpectedEOF {
  127. // Unexpected EOF is somewhat expected here, as we may exit before
  128. // returning something sensible.
  129. return nil, err
  130. }
  131. <-p.stopped
  132. return p.cmd.ProcessState, p.stopErr
  133. }
  134. // Stopped returns a channel that will be closed when Syncthing has stopped.
  135. func (p *Process) Stopped() chan struct{} {
  136. return p.stopped
  137. }
  138. // Get performs an HTTP GET and returns the bytes and/or an error. Any non-200
  139. // return code is returned as an error.
  140. func (p *Process) Get(path string) ([]byte, error) {
  141. client := &http.Client{
  142. Timeout: 30 * time.Second,
  143. Transport: &http.Transport{
  144. DialContext: dialer.DialContext,
  145. Proxy: http.ProxyFromEnvironment,
  146. DisableKeepAlives: true,
  147. },
  148. }
  149. url := fmt.Sprintf("http://%s%s", p.addr, path)
  150. req, err := http.NewRequest("GET", url, nil)
  151. if err != nil {
  152. return nil, err
  153. }
  154. req.Header.Add("X-API-Key", APIKey)
  155. resp, err := client.Do(req)
  156. if err != nil {
  157. return nil, err
  158. }
  159. return p.readResponse(resp)
  160. }
  161. // Post performs an HTTP POST and returns the bytes and/or an error. Any
  162. // non-200 return code is returned as an error.
  163. func (p *Process) Post(path string, data io.Reader) ([]byte, error) {
  164. client := &http.Client{
  165. Timeout: 600 * time.Second,
  166. Transport: &http.Transport{
  167. DisableKeepAlives: true,
  168. },
  169. }
  170. url := fmt.Sprintf("http://%s%s", p.addr, path)
  171. req, err := http.NewRequest("POST", url, data)
  172. if err != nil {
  173. return nil, err
  174. }
  175. req.Header.Add("X-API-Key", APIKey)
  176. req.Header.Add("Content-Type", "application/json")
  177. resp, err := client.Do(req)
  178. if err != nil {
  179. return nil, err
  180. }
  181. return p.readResponse(resp)
  182. }
  183. type Event struct {
  184. ID int
  185. Time time.Time
  186. Type string
  187. Data interface{}
  188. }
  189. func (p *Process) Events(since int) ([]Event, error) {
  190. bs, err := p.Get(fmt.Sprintf("/rest/events?since=%d&timeout=10", since))
  191. if err != nil {
  192. return nil, err
  193. }
  194. var evs []Event
  195. dec := json.NewDecoder(bytes.NewReader(bs))
  196. dec.UseNumber()
  197. err = dec.Decode(&evs)
  198. if err != nil {
  199. return nil, fmt.Errorf("events: %w in %q", err, bs)
  200. }
  201. return evs, err
  202. }
  203. func (p *Process) Rescan(folder string) error {
  204. _, err := p.Post("/rest/db/scan?folder="+url.QueryEscape(folder), nil)
  205. return err
  206. }
  207. func (p *Process) RescanDelay(folder string, delaySeconds int) error {
  208. _, err := p.Post(fmt.Sprintf("/rest/db/scan?folder=%s&next=%d", url.QueryEscape(folder), delaySeconds), nil)
  209. return err
  210. }
  211. func (p *Process) RescanSub(folder string, sub string, delaySeconds int) error {
  212. return p.RescanSubs(folder, []string{sub}, delaySeconds)
  213. }
  214. func (p *Process) RescanSubs(folder string, subs []string, delaySeconds int) error {
  215. data := url.Values{}
  216. data.Set("folder", folder)
  217. for _, sub := range subs {
  218. data.Add("sub", sub)
  219. }
  220. data.Set("next", strconv.Itoa(delaySeconds))
  221. _, err := p.Post("/rest/db/scan?"+data.Encode(), nil)
  222. return err
  223. }
  224. func (p *Process) ConfigInSync() (bool, error) {
  225. bs, err := p.Get("/rest/system/config/insync")
  226. if err != nil {
  227. return false, err
  228. }
  229. return bytes.Contains(bs, []byte("true")), nil
  230. }
  231. func (p *Process) GetConfig() (config.Configuration, error) {
  232. var cfg config.Configuration
  233. bs, err := p.Get("/rest/system/config")
  234. if err != nil {
  235. return cfg, err
  236. }
  237. err = json.Unmarshal(bs, &cfg)
  238. return cfg, err
  239. }
  240. func (p *Process) PostConfig(cfg config.Configuration) error {
  241. buf := new(bytes.Buffer)
  242. if err := json.NewEncoder(buf).Encode(cfg); err != nil {
  243. return err
  244. }
  245. _, err := p.Post("/rest/system/config", buf)
  246. return err
  247. }
  248. func (p *Process) PauseDevice(dev protocol.DeviceID) error {
  249. _, err := p.Post("/rest/system/pause?device="+dev.String(), nil)
  250. return err
  251. }
  252. func (p *Process) ResumeDevice(dev protocol.DeviceID) error {
  253. _, err := p.Post("/rest/system/resume?device="+dev.String(), nil)
  254. return err
  255. }
  256. func (p *Process) PauseAll() error {
  257. _, err := p.Post("/rest/system/pause", nil)
  258. return err
  259. }
  260. func (p *Process) ResumeAll() error {
  261. _, err := p.Post("/rest/system/resume", nil)
  262. return err
  263. }
  264. func InSync(folder string, ps ...*Process) bool {
  265. for _, p := range ps {
  266. p.eventMut.Lock()
  267. }
  268. defer func() {
  269. for _, p := range ps {
  270. p.eventMut.Unlock()
  271. }
  272. }()
  273. for i := range ps {
  274. // If our latest FolderSummary didn't report 100%, then we are not done.
  275. if !ps[i].done[folder] {
  276. l.Debugf("done = ps[%d].done[%q] = false", i, folder)
  277. return false
  278. }
  279. // Check Sequence for each device. The local version seen by remote
  280. // devices should be the same as what it has locally, or the index
  281. // hasn't been sent yet.
  282. sourceID := ps[i].id.String()
  283. sourceSeq := ps[i].sequence[folder][sourceID]
  284. l.Debugf("sourceSeq = ps[%d].sequence[%q][%q] = %d", i, folder, sourceID, sourceSeq)
  285. for j := range ps {
  286. if i != j {
  287. remoteSeq := ps[j].sequence[folder][sourceID]
  288. if remoteSeq != sourceSeq {
  289. l.Debugf("remoteSeq = ps[%d].sequence[%q][%q] = %d", j, folder, sourceID, remoteSeq)
  290. return false
  291. }
  292. }
  293. }
  294. }
  295. return true
  296. }
  297. func AwaitSync(folder string, ps ...*Process) {
  298. for {
  299. time.Sleep(250 * time.Millisecond)
  300. if InSync(folder, ps...) {
  301. return
  302. }
  303. }
  304. }
  305. type Model struct {
  306. GlobalBytes int
  307. GlobalDeleted int
  308. GlobalFiles int
  309. InSyncBytes int
  310. InSyncFiles int
  311. Invalid string
  312. LocalBytes int
  313. LocalDeleted int
  314. LocalFiles int
  315. NeedBytes int
  316. NeedFiles int
  317. State string
  318. StateChanged time.Time
  319. Version int
  320. }
  321. func (p *Process) Model(folder string) (Model, error) {
  322. bs, err := p.Get("/rest/db/status?folder=" + url.QueryEscape(folder))
  323. if err != nil {
  324. return Model{}, err
  325. }
  326. var res Model
  327. if err := json.Unmarshal(bs, &res); err != nil {
  328. return Model{}, err
  329. }
  330. l.Debugf("%+v", res)
  331. return res, nil
  332. }
  333. func (p *Process) readResponse(resp *http.Response) ([]byte, error) {
  334. bs, err := ioutil.ReadAll(resp.Body)
  335. resp.Body.Close()
  336. if err != nil {
  337. return bs, err
  338. }
  339. if resp.StatusCode != 200 {
  340. return bs, errors.New(resp.Status)
  341. }
  342. return bs, nil
  343. }
  344. func (p *Process) checkForProblems(logfd *os.File) error {
  345. fd, err := os.Open(logfd.Name())
  346. if err != nil {
  347. return err
  348. }
  349. defer fd.Close()
  350. raceConditionStart := []byte("WARNING: DATA RACE")
  351. raceConditionSep := []byte("==================")
  352. panicConditionStart := []byte("panic:")
  353. panicConditionSep := []byte("[") // fallback if we don't already know our ID
  354. if p.id.String() != "" {
  355. panicConditionSep = []byte(p.id.String()[:5])
  356. }
  357. sc := bufio.NewScanner(fd)
  358. race := false
  359. _panic := false
  360. for sc.Scan() {
  361. line := sc.Bytes()
  362. if race || _panic {
  363. if bytes.Contains(line, panicConditionSep) {
  364. _panic = false
  365. continue
  366. }
  367. fmt.Printf("%s\n", line)
  368. if bytes.Contains(line, raceConditionSep) {
  369. race = false
  370. }
  371. } else if bytes.Contains(line, raceConditionStart) {
  372. fmt.Printf("%s\n", raceConditionSep)
  373. fmt.Printf("%s\n", raceConditionStart)
  374. race = true
  375. if err == nil {
  376. err = errors.New("Race condition detected")
  377. }
  378. } else if bytes.Contains(line, panicConditionStart) {
  379. _panic = true
  380. if err == nil {
  381. err = errors.New("Panic detected")
  382. }
  383. }
  384. }
  385. return err
  386. }
  387. func (p *Process) eventLoop() {
  388. since := 0
  389. notScanned := make(map[string]struct{})
  390. start := time.Now()
  391. for {
  392. select {
  393. case <-p.stopped:
  394. return
  395. default:
  396. }
  397. evs, err := p.Events(since)
  398. if err != nil {
  399. if time.Since(start) < 5*time.Second {
  400. // The API has probably not started yet, lets give it some time.
  401. continue
  402. }
  403. // If we're stopping, no need to print the error.
  404. select {
  405. case <-p.stopped:
  406. return
  407. default:
  408. }
  409. log.Println("eventLoop: events:", err)
  410. continue
  411. }
  412. for _, ev := range evs {
  413. if ev.ID != since+1 {
  414. l.Warnln("Event ID jumped", since, "to", ev.ID)
  415. }
  416. since = ev.ID
  417. switch ev.Type {
  418. case "Starting":
  419. // The Starting event tells us where the configuration is. Load
  420. // it and populate our list of folders.
  421. data := ev.Data.(map[string]interface{})
  422. id, err := protocol.DeviceIDFromString(data["myID"].(string))
  423. if err != nil {
  424. log.Println("eventLoop: DeviceIdFromString:", err)
  425. continue
  426. }
  427. p.id = id
  428. home := data["home"].(string)
  429. w, _, err := config.Load(filepath.Join(home, "config.xml"), protocol.LocalDeviceID, events.NoopLogger)
  430. if err != nil {
  431. log.Println("eventLoop: Starting:", err)
  432. continue
  433. }
  434. for id := range w.Folders() {
  435. p.eventMut.Lock()
  436. p.folders = append(p.folders, id)
  437. p.eventMut.Unlock()
  438. notScanned[id] = struct{}{}
  439. }
  440. l.Debugln("Started", p.id)
  441. case "StateChanged":
  442. // When a folder changes to idle, we tick it off by removing
  443. // it from p.notScanned.
  444. if len(p.folders) == 0 {
  445. // We haven't parsed the config yet, shouldn't happen
  446. panic("race, or lost startup event")
  447. }
  448. select {
  449. case <-p.startComplete:
  450. default:
  451. data := ev.Data.(map[string]interface{})
  452. to := data["to"].(string)
  453. if to == "idle" {
  454. folder := data["folder"].(string)
  455. delete(notScanned, folder)
  456. if len(notScanned) == 0 {
  457. close(p.startComplete)
  458. }
  459. }
  460. }
  461. case "LocalIndexUpdated":
  462. data := ev.Data.(map[string]interface{})
  463. folder := data["folder"].(string)
  464. p.eventMut.Lock()
  465. m := p.updateSequenceLocked(folder, p.id.String(), data["sequence"])
  466. p.done[folder] = false
  467. l.Debugf("LocalIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
  468. p.eventMut.Unlock()
  469. case "RemoteIndexUpdated":
  470. data := ev.Data.(map[string]interface{})
  471. device := data["device"].(string)
  472. folder := data["folder"].(string)
  473. p.eventMut.Lock()
  474. m := p.updateSequenceLocked(folder, device, data["sequence"])
  475. p.done[folder] = false
  476. l.Debugf("RemoteIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
  477. p.eventMut.Unlock()
  478. case "FolderSummary":
  479. data := ev.Data.(map[string]interface{})
  480. folder := data["folder"].(string)
  481. summary := data["summary"].(map[string]interface{})
  482. need, _ := summary["needTotalItems"].(json.Number).Int64()
  483. done := need == 0
  484. p.eventMut.Lock()
  485. m := p.updateSequenceLocked(folder, p.id.String(), summary["sequence"])
  486. p.done[folder] = done
  487. l.Debugf("FolderSummary %v %v\n\t%+v\n\t%+v", p.id, folder, p.done, m)
  488. p.eventMut.Unlock()
  489. case "FolderCompletion":
  490. data := ev.Data.(map[string]interface{})
  491. device := data["device"].(string)
  492. folder := data["folder"].(string)
  493. p.eventMut.Lock()
  494. m := p.updateSequenceLocked(folder, device, data["sequence"])
  495. l.Debugf("FolderCompletion %v\n\t%+v", p.id, folder, m)
  496. p.eventMut.Unlock()
  497. }
  498. }
  499. }
  500. }
  501. func (p *Process) updateSequenceLocked(folder, device string, sequenceIntf interface{}) map[string]int64 {
  502. sequence, _ := sequenceIntf.(json.Number).Int64()
  503. m := p.sequence[folder]
  504. if m == nil {
  505. m = make(map[string]int64)
  506. }
  507. m[device] = sequence
  508. p.sequence[folder] = m
  509. return m
  510. }
  511. type ConnectionStats struct {
  512. Address string
  513. Type string
  514. Connected bool
  515. Paused bool
  516. ClientVersion string
  517. InBytesTotal int64
  518. OutBytesTotal int64
  519. }
  520. func (p *Process) Connections() (map[string]ConnectionStats, error) {
  521. bs, err := p.Get("/rest/system/connections")
  522. if err != nil {
  523. return nil, err
  524. }
  525. var res map[string]ConnectionStats
  526. if err := json.Unmarshal(bs, &res); err != nil {
  527. return nil, err
  528. }
  529. return res, nil
  530. }
  531. type SystemStatus struct {
  532. Alloc int64
  533. Goroutines int
  534. MyID protocol.DeviceID
  535. PathSeparator string
  536. StartTime time.Time
  537. Sys int64
  538. Themes []string
  539. Tilde string
  540. Uptime int
  541. }
  542. func (p *Process) SystemStatus() (SystemStatus, error) {
  543. bs, err := p.Get("/rest/system/status")
  544. if err != nil {
  545. return SystemStatus{}, err
  546. }
  547. var res SystemStatus
  548. if err := json.Unmarshal(bs, &res); err != nil {
  549. return SystemStatus{}, err
  550. }
  551. return res, nil
  552. }
  553. type SystemVersion struct {
  554. Arch string
  555. Codename string
  556. LongVersion string
  557. OS string
  558. Version string
  559. }
  560. func (p *Process) SystemVersion() (SystemVersion, error) {
  561. bs, err := p.Get("/rest/system/version")
  562. if err != nil {
  563. return SystemVersion{}, err
  564. }
  565. var res SystemVersion
  566. if err := json.Unmarshal(bs, &res); err != nil {
  567. return SystemVersion{}, err
  568. }
  569. return res, nil
  570. }
  571. func (p *Process) RemoteInSync(folder string, dev protocol.DeviceID) (bool, error) {
  572. bs, err := p.Get(fmt.Sprintf("/rest/db/completion?folder=%v&device=%v", url.QueryEscape(folder), dev))
  573. if err != nil {
  574. return false, err
  575. }
  576. var comp model.FolderCompletion
  577. if err := json.Unmarshal(bs, &comp); err != nil {
  578. return false, err
  579. }
  580. return comp.NeedItems+comp.NeedDeletes == 0, nil
  581. }