rc.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. // Package rc provides remote control of a Syncthing process via the REST API.
  7. package rc
  8. import (
  9. "bufio"
  10. "bytes"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "io/ioutil"
  16. "log"
  17. "net/http"
  18. "os"
  19. "os/exec"
  20. "path/filepath"
  21. stdsync "sync"
  22. "time"
  23. "github.com/syncthing/protocol"
  24. "github.com/syncthing/syncthing/internal/config"
  25. "github.com/syncthing/syncthing/internal/sync"
  26. )
  27. // We set the API key via the STGUIAPIKEY variable when we launch the binary,
  28. // to ensure that we have API access regardless of authentication settings.
  29. const APIKey = "592A47BC-A7DF-4C2F-89E0-A80B3E5094EE"
  30. type Process struct {
  31. // Set at initialization
  32. addr string
  33. // Set by eventLoop()
  34. eventMut sync.Mutex
  35. id protocol.DeviceID
  36. folders []string
  37. startComplete bool
  38. startCompleteCond *stdsync.Cond
  39. stop bool
  40. localVersion map[string]map[string]int64 // Folder ID => Device ID => LocalVersion
  41. done map[string]bool // Folder ID => 100%
  42. cmd *exec.Cmd
  43. logfd *os.File
  44. }
  45. // NewProcess returns a new Process talking to Syncthing at the specified address.
  46. // Example: NewProcess("127.0.0.1:8082")
  47. func NewProcess(addr string) *Process {
  48. p := &Process{
  49. addr: addr,
  50. localVersion: make(map[string]map[string]int64),
  51. done: make(map[string]bool),
  52. eventMut: sync.NewMutex(),
  53. }
  54. p.startCompleteCond = stdsync.NewCond(p.eventMut)
  55. return p
  56. }
  57. // LogTo creates the specified log file and ensures that stdout and stderr
  58. // from the Start()ed process is redirected there. Must be called before
  59. // Start().
  60. func (p *Process) LogTo(filename string) error {
  61. if p.cmd != nil {
  62. panic("logfd cannot be set with an existing cmd")
  63. }
  64. if p.logfd != nil {
  65. p.logfd.Close()
  66. }
  67. fd, err := os.Create(filename)
  68. if err != nil {
  69. return err
  70. }
  71. p.logfd = fd
  72. return nil
  73. }
  74. // Start runs the specified Syncthing binary with the given arguments.
  75. // Syncthing should be configured to provide an API on the address given to
  76. // NewProcess. Event processing is started.
  77. func (p *Process) Start(bin string, args ...string) error {
  78. cmd := exec.Command(bin, args...)
  79. if p.logfd != nil {
  80. cmd.Stdout = p.logfd
  81. cmd.Stderr = p.logfd
  82. }
  83. cmd.Env = append(os.Environ(), "STNORESTART=1", "STGUIAPIKEY="+APIKey)
  84. err := cmd.Start()
  85. if err != nil {
  86. return err
  87. }
  88. p.cmd = cmd
  89. go p.eventLoop()
  90. return nil
  91. }
  92. // AwaitStartup waits for the Syncthing process to start and perform initial
  93. // scans of all folders.
  94. func (p *Process) AwaitStartup() {
  95. p.eventMut.Lock()
  96. for !p.startComplete {
  97. p.startCompleteCond.Wait()
  98. }
  99. p.eventMut.Unlock()
  100. return
  101. }
  102. // Stop stops the running Syncthing process. If the process was logging to a
  103. // local file (set by LogTo), the log file will be opened and checked for
  104. // panics and data races. The presence of either will be signalled in the form
  105. // of a returned error.
  106. func (p *Process) Stop() (*os.ProcessState, error) {
  107. p.eventMut.Lock()
  108. if p.stop {
  109. p.eventMut.Unlock()
  110. return p.cmd.ProcessState, nil
  111. }
  112. p.stop = true
  113. p.eventMut.Unlock()
  114. if err := p.cmd.Process.Signal(os.Kill); err != nil {
  115. return nil, err
  116. }
  117. p.cmd.Wait()
  118. var err error
  119. if p.logfd != nil {
  120. err = p.checkForProblems(p.logfd)
  121. }
  122. return p.cmd.ProcessState, err
  123. }
  124. // Get performs an HTTP GET and returns the bytes and/or an error. Any non-200
  125. // return code is returned as an error.
  126. func (p *Process) Get(path string) ([]byte, error) {
  127. client := &http.Client{
  128. Timeout: 30 * time.Second,
  129. Transport: &http.Transport{
  130. DisableKeepAlives: true,
  131. },
  132. }
  133. url := fmt.Sprintf("http://%s%s", p.addr, path)
  134. req, err := http.NewRequest("GET", url, nil)
  135. if err != nil {
  136. return nil, err
  137. }
  138. req.Header.Add("X-API-Key", APIKey)
  139. resp, err := client.Do(req)
  140. if err != nil {
  141. return nil, err
  142. }
  143. return p.readResponse(resp)
  144. }
  145. // Post performs an HTTP POST and returns the bytes and/or an error. Any
  146. // non-200 return code is returned as an error.
  147. func (p *Process) Post(path string, data io.Reader) ([]byte, error) {
  148. client := &http.Client{
  149. Timeout: 600 * time.Second,
  150. Transport: &http.Transport{
  151. DisableKeepAlives: true,
  152. },
  153. }
  154. url := fmt.Sprintf("http://%s%s", p.addr, path)
  155. req, err := http.NewRequest("POST", url, data)
  156. if err != nil {
  157. return nil, err
  158. }
  159. req.Header.Add("X-API-Key", APIKey)
  160. req.Header.Add("Content-Type", "application/json")
  161. resp, err := client.Do(req)
  162. if err != nil {
  163. return nil, err
  164. }
  165. return p.readResponse(resp)
  166. }
  167. type Event struct {
  168. ID int
  169. Time time.Time
  170. Type string
  171. Data interface{}
  172. }
  173. func (p *Process) Events(since int) ([]Event, error) {
  174. bs, err := p.Get(fmt.Sprintf("/rest/events?since=%d", since))
  175. if err != nil {
  176. return nil, err
  177. }
  178. var evs []Event
  179. dec := json.NewDecoder(bytes.NewReader(bs))
  180. dec.UseNumber()
  181. err = dec.Decode(&evs)
  182. if err != nil {
  183. return nil, fmt.Errorf("Events: %s in %q", err, bs)
  184. }
  185. return evs, err
  186. }
  187. func (p *Process) Rescan(folder string) error {
  188. _, err := p.Post("/rest/db/scan?folder="+folder, nil)
  189. return err
  190. }
  191. func (p *Process) RescanDelay(folder string, delaySeconds int) error {
  192. _, err := p.Post(fmt.Sprintf("/rest/db/scan?folder=%s&next=%d", folder, delaySeconds), nil)
  193. return err
  194. }
  195. func InSync(folder string, ps ...*Process) bool {
  196. for _, p := range ps {
  197. p.eventMut.Lock()
  198. }
  199. defer func() {
  200. for _, p := range ps {
  201. p.eventMut.Unlock()
  202. }
  203. }()
  204. for i := range ps {
  205. // If our latest FolderSummary didn't report 100%, then we are not done.
  206. if !ps[i].done[folder] {
  207. return false
  208. }
  209. // Check LocalVersion for each device. The local version seen by remote
  210. // devices should be the same as what it has locally, or the index
  211. // hasn't been sent yet.
  212. sourceID := ps[i].id.String()
  213. sourceVersion := ps[i].localVersion[folder][sourceID]
  214. for j := range ps {
  215. if i != j {
  216. remoteVersion := ps[j].localVersion[folder][sourceID]
  217. if remoteVersion != sourceVersion {
  218. return false
  219. }
  220. }
  221. }
  222. }
  223. return true
  224. }
  225. func AwaitSync(folder string, ps ...*Process) {
  226. for {
  227. time.Sleep(250 * time.Millisecond)
  228. if InSync(folder, ps...) {
  229. return
  230. }
  231. }
  232. }
  233. type Model struct {
  234. GlobalBytes int
  235. GlobalDeleted int
  236. GlobalFiles int
  237. InSyncBytes int
  238. InSyncFiles int
  239. Invalid string
  240. LocalBytes int
  241. LocalDeleted int
  242. LocalFiles int
  243. NeedBytes int
  244. NeedFiles int
  245. State string
  246. StateChanged time.Time
  247. Version int
  248. }
  249. func (p *Process) Model(folder string) (Model, error) {
  250. bs, err := p.Get("/rest/db/status?folder=" + folder)
  251. if err != nil {
  252. return Model{}, err
  253. }
  254. var res Model
  255. if err := json.Unmarshal(bs, &res); err != nil {
  256. return Model{}, err
  257. }
  258. if debug {
  259. l.Debugf("%+v", res)
  260. }
  261. return res, nil
  262. }
  263. func (p *Process) readResponse(resp *http.Response) ([]byte, error) {
  264. bs, err := ioutil.ReadAll(resp.Body)
  265. resp.Body.Close()
  266. if err != nil {
  267. return bs, err
  268. }
  269. if resp.StatusCode != 200 {
  270. return bs, fmt.Errorf("%s", resp.Status)
  271. }
  272. return bs, nil
  273. }
  274. func (p *Process) checkForProblems(logfd *os.File) error {
  275. fd, err := os.Open(logfd.Name())
  276. if err != nil {
  277. return err
  278. }
  279. defer fd.Close()
  280. raceConditionStart := []byte("WARNING: DATA RACE")
  281. raceConditionSep := []byte("==================")
  282. panicConditionStart := []byte("panic:")
  283. panicConditionSep := []byte(p.id.String()[:5])
  284. sc := bufio.NewScanner(fd)
  285. race := false
  286. _panic := false
  287. for sc.Scan() {
  288. line := sc.Bytes()
  289. if race || _panic {
  290. if bytes.Contains(line, panicConditionSep) {
  291. _panic = false
  292. continue
  293. }
  294. fmt.Printf("%s\n", line)
  295. if bytes.Contains(line, raceConditionSep) {
  296. race = false
  297. }
  298. } else if bytes.Contains(line, raceConditionStart) {
  299. fmt.Printf("%s\n", raceConditionSep)
  300. fmt.Printf("%s\n", raceConditionStart)
  301. race = true
  302. if err == nil {
  303. err = errors.New("Race condition detected")
  304. }
  305. } else if bytes.Contains(line, panicConditionStart) {
  306. _panic = true
  307. if err == nil {
  308. err = errors.New("Panic detected")
  309. }
  310. }
  311. }
  312. return err
  313. }
  314. func (p *Process) eventLoop() {
  315. since := 0
  316. notScanned := make(map[string]struct{})
  317. start := time.Now()
  318. for {
  319. p.eventMut.Lock()
  320. if p.stop {
  321. p.eventMut.Unlock()
  322. return
  323. }
  324. p.eventMut.Unlock()
  325. time.Sleep(250 * time.Millisecond)
  326. events, err := p.Events(since)
  327. if err != nil {
  328. if time.Since(start) < 5*time.Second {
  329. // The API has probably not started yet, lets give it some time.
  330. continue
  331. }
  332. // If we're stopping, no need to print the error.
  333. p.eventMut.Lock()
  334. if p.stop {
  335. p.eventMut.Unlock()
  336. return
  337. }
  338. p.eventMut.Unlock()
  339. log.Println("eventLoop: events:", err)
  340. continue
  341. }
  342. since = events[len(events)-1].ID
  343. for _, ev := range events {
  344. switch ev.Type {
  345. case "Starting":
  346. // The Starting event tells us where the configuration is. Load
  347. // it and populate our list of folders.
  348. data := ev.Data.(map[string]interface{})
  349. id, err := protocol.DeviceIDFromString(data["myID"].(string))
  350. if err != nil {
  351. log.Println("eventLoop: DeviceIdFromString:", err)
  352. continue
  353. }
  354. p.id = id
  355. home := data["home"].(string)
  356. w, err := config.Load(filepath.Join(home, "config.xml"), protocol.LocalDeviceID)
  357. if err != nil {
  358. log.Println("eventLoop: Starting:", err)
  359. continue
  360. }
  361. for id := range w.Folders() {
  362. p.eventMut.Lock()
  363. p.folders = append(p.folders, id)
  364. p.eventMut.Unlock()
  365. notScanned[id] = struct{}{}
  366. }
  367. case "StateChanged":
  368. // When a folder changes to idle, we tick it off by removing
  369. // it from p.notScanned.
  370. if !p.startComplete {
  371. data := ev.Data.(map[string]interface{})
  372. to := data["to"].(string)
  373. if to == "idle" {
  374. folder := data["folder"].(string)
  375. delete(notScanned, folder)
  376. if len(notScanned) == 0 {
  377. p.eventMut.Lock()
  378. p.startComplete = true
  379. p.startCompleteCond.Broadcast()
  380. p.eventMut.Unlock()
  381. }
  382. }
  383. }
  384. case "LocalIndexUpdated":
  385. data := ev.Data.(map[string]interface{})
  386. folder := data["folder"].(string)
  387. version, _ := data["version"].(json.Number).Int64()
  388. p.eventMut.Lock()
  389. m := p.localVersion[folder]
  390. if m == nil {
  391. m = make(map[string]int64)
  392. }
  393. m[p.id.String()] = version
  394. p.localVersion[folder] = m
  395. p.done[folder] = false
  396. if debug {
  397. l.Debugf("LocalIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
  398. }
  399. p.eventMut.Unlock()
  400. case "RemoteIndexUpdated":
  401. data := ev.Data.(map[string]interface{})
  402. device := data["device"].(string)
  403. folder := data["folder"].(string)
  404. version, _ := data["version"].(json.Number).Int64()
  405. p.eventMut.Lock()
  406. m := p.localVersion[folder]
  407. if m == nil {
  408. m = make(map[string]int64)
  409. }
  410. m[device] = version
  411. p.localVersion[folder] = m
  412. p.done[folder] = false
  413. if debug {
  414. l.Debugf("RemoteIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
  415. }
  416. p.eventMut.Unlock()
  417. case "FolderSummary":
  418. data := ev.Data.(map[string]interface{})
  419. folder := data["folder"].(string)
  420. summary := data["summary"].(map[string]interface{})
  421. need, _ := summary["needBytes"].(json.Number).Int64()
  422. done := need == 0
  423. p.eventMut.Lock()
  424. p.done[folder] = done
  425. if debug {
  426. l.Debugf("Foldersummary %v %v\n\t%+v", p.id, folder, p.done)
  427. }
  428. p.eventMut.Unlock()
  429. }
  430. }
  431. }
  432. }