rc.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. // Package rc provides remote control of a Syncthing process via the REST API.
  7. package rc
  8. import (
  9. "bufio"
  10. "bytes"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "io/ioutil"
  16. "log"
  17. "net/http"
  18. "net/url"
  19. "os"
  20. "os/exec"
  21. "path/filepath"
  22. "strconv"
  23. stdsync "sync"
  24. "time"
  25. "github.com/syncthing/syncthing/lib/config"
  26. "github.com/syncthing/syncthing/lib/protocol"
  27. "github.com/syncthing/syncthing/lib/sync"
  28. )
  29. // We set the API key via the STGUIAPIKEY variable when we launch the binary,
  30. // to ensure that we have API access regardless of authentication settings.
  31. const APIKey = "592A47BC-A7DF-4C2F-89E0-A80B3E5094EE"
  32. type Process struct {
  33. // Set at initialization
  34. addr string
  35. // Set by eventLoop()
  36. eventMut sync.Mutex
  37. id protocol.DeviceID
  38. folders []string
  39. startComplete bool
  40. startCompleteCond *stdsync.Cond
  41. stop bool
  42. localVersion map[string]map[string]int64 // Folder ID => Device ID => LocalVersion
  43. done map[string]bool // Folder ID => 100%
  44. cmd *exec.Cmd
  45. logfd *os.File
  46. }
  47. // NewProcess returns a new Process talking to Syncthing at the specified address.
  48. // Example: NewProcess("127.0.0.1:8082")
  49. func NewProcess(addr string) *Process {
  50. p := &Process{
  51. addr: addr,
  52. localVersion: make(map[string]map[string]int64),
  53. done: make(map[string]bool),
  54. eventMut: sync.NewMutex(),
  55. }
  56. p.startCompleteCond = stdsync.NewCond(p.eventMut)
  57. return p
  58. }
  59. func (p *Process) ID() protocol.DeviceID {
  60. return p.id
  61. }
  62. // LogTo creates the specified log file and ensures that stdout and stderr
  63. // from the Start()ed process is redirected there. Must be called before
  64. // Start().
  65. func (p *Process) LogTo(filename string) error {
  66. if p.cmd != nil {
  67. panic("logfd cannot be set with an existing cmd")
  68. }
  69. if p.logfd != nil {
  70. p.logfd.Close()
  71. }
  72. fd, err := os.Create(filename)
  73. if err != nil {
  74. return err
  75. }
  76. p.logfd = fd
  77. return nil
  78. }
  79. // Start runs the specified Syncthing binary with the given arguments.
  80. // Syncthing should be configured to provide an API on the address given to
  81. // NewProcess. Event processing is started.
  82. func (p *Process) Start(bin string, args ...string) error {
  83. cmd := exec.Command(bin, args...)
  84. if p.logfd != nil {
  85. cmd.Stdout = p.logfd
  86. cmd.Stderr = p.logfd
  87. }
  88. cmd.Env = append(os.Environ(), "STNORESTART=1", "STGUIAPIKEY="+APIKey)
  89. err := cmd.Start()
  90. if err != nil {
  91. return err
  92. }
  93. p.cmd = cmd
  94. go p.eventLoop()
  95. return nil
  96. }
  97. // AwaitStartup waits for the Syncthing process to start and perform initial
  98. // scans of all folders.
  99. func (p *Process) AwaitStartup() {
  100. p.eventMut.Lock()
  101. for !p.startComplete {
  102. p.startCompleteCond.Wait()
  103. }
  104. p.eventMut.Unlock()
  105. return
  106. }
  107. // Stop stops the running Syncthing process. If the process was logging to a
  108. // local file (set by LogTo), the log file will be opened and checked for
  109. // panics and data races. The presence of either will be signalled in the form
  110. // of a returned error.
  111. func (p *Process) Stop() (*os.ProcessState, error) {
  112. p.eventMut.Lock()
  113. if p.stop {
  114. p.eventMut.Unlock()
  115. return p.cmd.ProcessState, nil
  116. }
  117. p.stop = true
  118. p.eventMut.Unlock()
  119. if _, err := p.Post("/rest/system/shutdown", nil); err != nil && err != io.ErrUnexpectedEOF {
  120. // Unexpected EOF is somewhat expected here, as we may exit before
  121. // returning something sensible.
  122. return nil, err
  123. }
  124. p.cmd.Wait()
  125. var err error
  126. if p.logfd != nil {
  127. err = p.checkForProblems(p.logfd)
  128. }
  129. return p.cmd.ProcessState, err
  130. }
  131. // Get performs an HTTP GET and returns the bytes and/or an error. Any non-200
  132. // return code is returned as an error.
  133. func (p *Process) Get(path string) ([]byte, error) {
  134. client := &http.Client{
  135. Timeout: 30 * time.Second,
  136. Transport: &http.Transport{
  137. DisableKeepAlives: true,
  138. },
  139. }
  140. url := fmt.Sprintf("http://%s%s", p.addr, path)
  141. req, err := http.NewRequest("GET", url, nil)
  142. if err != nil {
  143. return nil, err
  144. }
  145. req.Header.Add("X-API-Key", APIKey)
  146. resp, err := client.Do(req)
  147. if err != nil {
  148. return nil, err
  149. }
  150. return p.readResponse(resp)
  151. }
  152. // Post performs an HTTP POST and returns the bytes and/or an error. Any
  153. // non-200 return code is returned as an error.
  154. func (p *Process) Post(path string, data io.Reader) ([]byte, error) {
  155. client := &http.Client{
  156. Timeout: 600 * time.Second,
  157. Transport: &http.Transport{
  158. DisableKeepAlives: true,
  159. },
  160. }
  161. url := fmt.Sprintf("http://%s%s", p.addr, path)
  162. req, err := http.NewRequest("POST", url, data)
  163. if err != nil {
  164. return nil, err
  165. }
  166. req.Header.Add("X-API-Key", APIKey)
  167. req.Header.Add("Content-Type", "application/json")
  168. resp, err := client.Do(req)
  169. if err != nil {
  170. return nil, err
  171. }
  172. return p.readResponse(resp)
  173. }
  174. type Event struct {
  175. ID int
  176. Time time.Time
  177. Type string
  178. Data interface{}
  179. }
  180. func (p *Process) Events(since int) ([]Event, error) {
  181. bs, err := p.Get(fmt.Sprintf("/rest/events?since=%d", since))
  182. if err != nil {
  183. return nil, err
  184. }
  185. var evs []Event
  186. dec := json.NewDecoder(bytes.NewReader(bs))
  187. dec.UseNumber()
  188. err = dec.Decode(&evs)
  189. if err != nil {
  190. return nil, fmt.Errorf("Events: %s in %q", err, bs)
  191. }
  192. return evs, err
  193. }
  194. func (p *Process) Rescan(folder string) error {
  195. _, err := p.Post("/rest/db/scan?folder="+folder, nil)
  196. return err
  197. }
  198. func (p *Process) RescanDelay(folder string, delaySeconds int) error {
  199. _, err := p.Post(fmt.Sprintf("/rest/db/scan?folder=%s&next=%d", folder, delaySeconds), nil)
  200. return err
  201. }
  202. func (p *Process) RescanSub(folder string, sub string, delaySeconds int) error {
  203. return p.RescanSubs(folder, []string{sub}, delaySeconds)
  204. }
  205. func (p *Process) RescanSubs(folder string, subs []string, delaySeconds int) error {
  206. data := url.Values{}
  207. data.Set("folder", folder)
  208. for _, sub := range subs {
  209. data.Add("sub", sub)
  210. }
  211. data.Set("next", strconv.Itoa(delaySeconds))
  212. _, err := p.Post("/rest/db/scan?"+data.Encode(), nil)
  213. return err
  214. }
  215. func (p *Process) ConfigInSync() (bool, error) {
  216. bs, err := p.Get("/rest/system/config/insync")
  217. if err != nil {
  218. return false, err
  219. }
  220. return bytes.Contains(bs, []byte("true")), nil
  221. }
  222. func (p *Process) GetConfig() (config.Configuration, error) {
  223. var cfg config.Configuration
  224. bs, err := p.Get("/rest/system/config")
  225. if err != nil {
  226. return cfg, err
  227. }
  228. err = json.Unmarshal(bs, &cfg)
  229. return cfg, err
  230. }
  231. func (p *Process) PostConfig(cfg config.Configuration) error {
  232. buf := new(bytes.Buffer)
  233. if err := json.NewEncoder(buf).Encode(cfg); err != nil {
  234. return err
  235. }
  236. _, err := p.Post("/rest/system/config", buf)
  237. return err
  238. }
  239. func InSync(folder string, ps ...*Process) bool {
  240. for _, p := range ps {
  241. p.eventMut.Lock()
  242. }
  243. defer func() {
  244. for _, p := range ps {
  245. p.eventMut.Unlock()
  246. }
  247. }()
  248. for i := range ps {
  249. // If our latest FolderSummary didn't report 100%, then we are not done.
  250. if !ps[i].done[folder] {
  251. return false
  252. }
  253. // Check LocalVersion for each device. The local version seen by remote
  254. // devices should be the same as what it has locally, or the index
  255. // hasn't been sent yet.
  256. sourceID := ps[i].id.String()
  257. sourceVersion := ps[i].localVersion[folder][sourceID]
  258. for j := range ps {
  259. if i != j {
  260. remoteVersion := ps[j].localVersion[folder][sourceID]
  261. if remoteVersion != sourceVersion {
  262. return false
  263. }
  264. }
  265. }
  266. }
  267. return true
  268. }
  269. func AwaitSync(folder string, ps ...*Process) {
  270. for {
  271. time.Sleep(250 * time.Millisecond)
  272. if InSync(folder, ps...) {
  273. return
  274. }
  275. }
  276. }
  277. type Model struct {
  278. GlobalBytes int
  279. GlobalDeleted int
  280. GlobalFiles int
  281. InSyncBytes int
  282. InSyncFiles int
  283. Invalid string
  284. LocalBytes int
  285. LocalDeleted int
  286. LocalFiles int
  287. NeedBytes int
  288. NeedFiles int
  289. State string
  290. StateChanged time.Time
  291. Version int
  292. }
  293. func (p *Process) Model(folder string) (Model, error) {
  294. bs, err := p.Get("/rest/db/status?folder=" + folder)
  295. if err != nil {
  296. return Model{}, err
  297. }
  298. var res Model
  299. if err := json.Unmarshal(bs, &res); err != nil {
  300. return Model{}, err
  301. }
  302. if debug {
  303. l.Debugf("%+v", res)
  304. }
  305. return res, nil
  306. }
  307. func (p *Process) readResponse(resp *http.Response) ([]byte, error) {
  308. bs, err := ioutil.ReadAll(resp.Body)
  309. resp.Body.Close()
  310. if err != nil {
  311. return bs, err
  312. }
  313. if resp.StatusCode != 200 {
  314. return bs, fmt.Errorf("%s", resp.Status)
  315. }
  316. return bs, nil
  317. }
  318. func (p *Process) checkForProblems(logfd *os.File) error {
  319. fd, err := os.Open(logfd.Name())
  320. if err != nil {
  321. return err
  322. }
  323. defer fd.Close()
  324. raceConditionStart := []byte("WARNING: DATA RACE")
  325. raceConditionSep := []byte("==================")
  326. panicConditionStart := []byte("panic:")
  327. panicConditionSep := []byte(p.id.String()[:5])
  328. sc := bufio.NewScanner(fd)
  329. race := false
  330. _panic := false
  331. for sc.Scan() {
  332. line := sc.Bytes()
  333. if race || _panic {
  334. if bytes.Contains(line, panicConditionSep) {
  335. _panic = false
  336. continue
  337. }
  338. fmt.Printf("%s\n", line)
  339. if bytes.Contains(line, raceConditionSep) {
  340. race = false
  341. }
  342. } else if bytes.Contains(line, raceConditionStart) {
  343. fmt.Printf("%s\n", raceConditionSep)
  344. fmt.Printf("%s\n", raceConditionStart)
  345. race = true
  346. if err == nil {
  347. err = errors.New("Race condition detected")
  348. }
  349. } else if bytes.Contains(line, panicConditionStart) {
  350. _panic = true
  351. if err == nil {
  352. err = errors.New("Panic detected")
  353. }
  354. }
  355. }
  356. return err
  357. }
  358. func (p *Process) eventLoop() {
  359. since := 0
  360. notScanned := make(map[string]struct{})
  361. start := time.Now()
  362. for {
  363. p.eventMut.Lock()
  364. if p.stop {
  365. p.eventMut.Unlock()
  366. return
  367. }
  368. p.eventMut.Unlock()
  369. time.Sleep(250 * time.Millisecond)
  370. events, err := p.Events(since)
  371. if err != nil {
  372. if time.Since(start) < 5*time.Second {
  373. // The API has probably not started yet, lets give it some time.
  374. continue
  375. }
  376. // If we're stopping, no need to print the error.
  377. p.eventMut.Lock()
  378. if p.stop {
  379. p.eventMut.Unlock()
  380. return
  381. }
  382. p.eventMut.Unlock()
  383. log.Println("eventLoop: events:", err)
  384. continue
  385. }
  386. since = events[len(events)-1].ID
  387. for _, ev := range events {
  388. switch ev.Type {
  389. case "Starting":
  390. // The Starting event tells us where the configuration is. Load
  391. // it and populate our list of folders.
  392. data := ev.Data.(map[string]interface{})
  393. id, err := protocol.DeviceIDFromString(data["myID"].(string))
  394. if err != nil {
  395. log.Println("eventLoop: DeviceIdFromString:", err)
  396. continue
  397. }
  398. p.id = id
  399. home := data["home"].(string)
  400. w, err := config.Load(filepath.Join(home, "config.xml"), protocol.LocalDeviceID)
  401. if err != nil {
  402. log.Println("eventLoop: Starting:", err)
  403. continue
  404. }
  405. for id := range w.Folders() {
  406. p.eventMut.Lock()
  407. p.folders = append(p.folders, id)
  408. p.eventMut.Unlock()
  409. notScanned[id] = struct{}{}
  410. }
  411. case "StateChanged":
  412. // When a folder changes to idle, we tick it off by removing
  413. // it from p.notScanned.
  414. if !p.startComplete {
  415. data := ev.Data.(map[string]interface{})
  416. to := data["to"].(string)
  417. if to == "idle" {
  418. folder := data["folder"].(string)
  419. delete(notScanned, folder)
  420. if len(notScanned) == 0 {
  421. p.eventMut.Lock()
  422. p.startComplete = true
  423. p.startCompleteCond.Broadcast()
  424. p.eventMut.Unlock()
  425. }
  426. }
  427. }
  428. case "LocalIndexUpdated":
  429. data := ev.Data.(map[string]interface{})
  430. folder := data["folder"].(string)
  431. version, _ := data["version"].(json.Number).Int64()
  432. p.eventMut.Lock()
  433. m := p.localVersion[folder]
  434. if m == nil {
  435. m = make(map[string]int64)
  436. }
  437. m[p.id.String()] = version
  438. p.localVersion[folder] = m
  439. p.done[folder] = false
  440. if debug {
  441. l.Debugf("LocalIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
  442. }
  443. p.eventMut.Unlock()
  444. case "RemoteIndexUpdated":
  445. data := ev.Data.(map[string]interface{})
  446. device := data["device"].(string)
  447. folder := data["folder"].(string)
  448. version, _ := data["version"].(json.Number).Int64()
  449. p.eventMut.Lock()
  450. m := p.localVersion[folder]
  451. if m == nil {
  452. m = make(map[string]int64)
  453. }
  454. m[device] = version
  455. p.localVersion[folder] = m
  456. p.done[folder] = false
  457. if debug {
  458. l.Debugf("RemoteIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
  459. }
  460. p.eventMut.Unlock()
  461. case "FolderSummary":
  462. data := ev.Data.(map[string]interface{})
  463. folder := data["folder"].(string)
  464. summary := data["summary"].(map[string]interface{})
  465. need, _ := summary["needBytes"].(json.Number).Int64()
  466. done := need == 0
  467. p.eventMut.Lock()
  468. p.done[folder] = done
  469. if debug {
  470. l.Debugf("Foldersummary %v %v\n\t%+v", p.id, folder, p.done)
  471. }
  472. p.eventMut.Unlock()
  473. }
  474. }
  475. }
  476. }