syncthingprocess.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. // +build integration
  7. package integration
  8. import (
  9. "bufio"
  10. "bytes"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "io/ioutil"
  16. "log"
  17. "net/http"
  18. "os"
  19. "os/exec"
  20. "strconv"
  21. "time"
  22. "github.com/syncthing/protocol"
  23. )
  24. var env = []string{
  25. "HOME=.",
  26. "STGUIAPIKEY=" + apiKey,
  27. "STNORESTART=1",
  28. }
  29. type syncthingProcess struct {
  30. instance string
  31. argv []string
  32. port int
  33. apiKey string
  34. csrfToken string
  35. lastEvent int
  36. id protocol.DeviceID
  37. cmd *exec.Cmd
  38. logfd *os.File
  39. }
  40. func (p *syncthingProcess) start() error {
  41. if p.logfd == nil {
  42. logfd, err := os.Create("logs/" + getTestName() + "-" + p.instance + ".out")
  43. if err != nil {
  44. return err
  45. }
  46. p.logfd = logfd
  47. }
  48. binary := "../bin/syncthing"
  49. // We check to see if there's an instance specific binary we should run,
  50. // for example if we are running integration tests between different
  51. // versions. If there isn't, we just go with the default.
  52. if _, err := os.Stat(binary + "-" + p.instance); err == nil {
  53. binary = binary + "-" + p.instance
  54. }
  55. if _, err := os.Stat(binary + "-" + p.instance + ".exe"); err == nil {
  56. binary = binary + "-" + p.instance + ".exe"
  57. }
  58. argv := append(p.argv, "-no-browser", "-verbose")
  59. cmd := exec.Command(binary, argv...)
  60. cmd.Stdout = p.logfd
  61. cmd.Stderr = p.logfd
  62. cmd.Env = append(os.Environ(), env...)
  63. err := cmd.Start()
  64. if err != nil {
  65. return err
  66. }
  67. p.cmd = cmd
  68. for {
  69. time.Sleep(250 * time.Millisecond)
  70. resp, err := p.get("/rest/system/status")
  71. if err != nil {
  72. continue
  73. }
  74. var sysData map[string]interface{}
  75. err = json.NewDecoder(resp.Body).Decode(&sysData)
  76. resp.Body.Close()
  77. if err != nil {
  78. // This one is unexpected. Print it.
  79. log.Println("/rest/system/status (JSON):", err)
  80. continue
  81. }
  82. id, err := protocol.DeviceIDFromString(sysData["myID"].(string))
  83. if err != nil {
  84. // This one is unexpected. Print it.
  85. log.Println("/rest/system/status (myID):", err)
  86. continue
  87. }
  88. p.id = id
  89. return nil
  90. }
  91. }
  92. func (p *syncthingProcess) stop() (*os.ProcessState, error) {
  93. p.cmd.Process.Signal(os.Kill)
  94. p.cmd.Wait()
  95. fd, err := os.Open(p.logfd.Name())
  96. if err != nil {
  97. return p.cmd.ProcessState, err
  98. }
  99. defer fd.Close()
  100. raceConditionStart := []byte("WARNING: DATA RACE")
  101. raceConditionSep := []byte("==================")
  102. panicConditionStart := []byte("panic:")
  103. panicConditionSep := []byte(p.id.String()[:5])
  104. sc := bufio.NewScanner(fd)
  105. race := false
  106. _panic := false
  107. for sc.Scan() {
  108. line := sc.Bytes()
  109. if race || _panic {
  110. if bytes.Contains(line, panicConditionSep) {
  111. _panic = false
  112. continue
  113. }
  114. fmt.Printf("%s\n", line)
  115. if bytes.Contains(line, raceConditionSep) {
  116. race = false
  117. }
  118. } else if bytes.Contains(line, raceConditionStart) {
  119. fmt.Printf("%s\n", raceConditionSep)
  120. fmt.Printf("%s\n", raceConditionStart)
  121. race = true
  122. if err == nil {
  123. err = errors.New("Race condition detected")
  124. }
  125. } else if bytes.Contains(line, panicConditionStart) {
  126. _panic = true
  127. if err == nil {
  128. err = errors.New("Panic detected")
  129. }
  130. }
  131. }
  132. return p.cmd.ProcessState, err
  133. }
  134. func (p *syncthingProcess) get(path string) (*http.Response, error) {
  135. client := &http.Client{
  136. Timeout: 30 * time.Second,
  137. Transport: &http.Transport{
  138. DisableKeepAlives: true,
  139. },
  140. }
  141. req, err := http.NewRequest("GET", fmt.Sprintf("http://127.0.0.1:%d%s", p.port, path), nil)
  142. if err != nil {
  143. return nil, err
  144. }
  145. if p.apiKey != "" {
  146. req.Header.Add("X-API-Key", p.apiKey)
  147. }
  148. if p.csrfToken != "" {
  149. req.Header.Add("X-CSRF-Token", p.csrfToken)
  150. }
  151. resp, err := client.Do(req)
  152. if err != nil {
  153. return nil, err
  154. }
  155. return resp, nil
  156. }
  157. func (p *syncthingProcess) post(path string, data io.Reader) (*http.Response, error) {
  158. client := &http.Client{
  159. Timeout: 600 * time.Second,
  160. Transport: &http.Transport{
  161. DisableKeepAlives: true,
  162. },
  163. }
  164. req, err := http.NewRequest("POST", fmt.Sprintf("http://127.0.0.1:%d%s", p.port, path), data)
  165. if err != nil {
  166. return nil, err
  167. }
  168. if p.apiKey != "" {
  169. req.Header.Add("X-API-Key", p.apiKey)
  170. }
  171. if p.csrfToken != "" {
  172. req.Header.Add("X-CSRF-Token", p.csrfToken)
  173. }
  174. req.Header.Add("Content-Type", "application/json")
  175. resp, err := client.Do(req)
  176. if err != nil {
  177. return nil, err
  178. }
  179. return resp, nil
  180. }
  181. type model struct {
  182. GlobalBytes int
  183. GlobalDeleted int
  184. GlobalFiles int
  185. InSyncBytes int
  186. InSyncFiles int
  187. Invalid string
  188. LocalBytes int
  189. LocalDeleted int
  190. LocalFiles int
  191. NeedBytes int
  192. NeedFiles int
  193. State string
  194. StateChanged time.Time
  195. Version int
  196. }
  197. func (p *syncthingProcess) model(folder string) (model, error) {
  198. resp, err := p.get("/rest/db/status?folder=" + folder)
  199. if err != nil {
  200. return model{}, err
  201. }
  202. var res model
  203. err = json.NewDecoder(resp.Body).Decode(&res)
  204. if err != nil {
  205. return model{}, err
  206. }
  207. return res, nil
  208. }
  209. type event struct {
  210. ID int
  211. Time time.Time
  212. Type string
  213. Data interface{}
  214. }
  215. func (p *syncthingProcess) events() ([]event, error) {
  216. resp, err := p.get(fmt.Sprintf("/rest/events?since=%d", p.lastEvent))
  217. if err != nil {
  218. return nil, err
  219. }
  220. defer resp.Body.Close()
  221. var evs []event
  222. err = json.NewDecoder(resp.Body).Decode(&evs)
  223. if err != nil {
  224. return nil, err
  225. }
  226. p.lastEvent = evs[len(evs)-1].ID
  227. return evs, err
  228. }
  229. type versionResp struct {
  230. Version string
  231. }
  232. func (p *syncthingProcess) version() (string, error) {
  233. resp, err := p.get("/rest/system/version")
  234. if err != nil {
  235. return "", err
  236. }
  237. defer resp.Body.Close()
  238. var v versionResp
  239. err = json.NewDecoder(resp.Body).Decode(&v)
  240. if err != nil {
  241. return "", err
  242. }
  243. return v.Version, nil
  244. }
  245. type statusResp struct {
  246. GlobalBytes int
  247. InSyncBytes int
  248. Version int
  249. }
  250. func (p *syncthingProcess) dbStatus(folder string) (statusResp, error) {
  251. resp, err := p.get("/rest/db/status?folder=" + folder)
  252. if err != nil {
  253. return statusResp{}, err
  254. }
  255. defer resp.Body.Close()
  256. var s statusResp
  257. err = json.NewDecoder(resp.Body).Decode(&s)
  258. if err != nil {
  259. return statusResp{}, err
  260. }
  261. return s, nil
  262. }
  263. func (p *syncthingProcess) insync(folder string) (bool, int, error) {
  264. s, err := p.dbStatus(folder)
  265. if err != nil {
  266. return false, 0, err
  267. }
  268. return s.GlobalBytes == s.InSyncBytes, s.Version, nil
  269. }
  270. func (p *syncthingProcess) rescan(folder string) error {
  271. resp, err := p.post("/rest/db/scan?folder="+folder, nil)
  272. if err != nil {
  273. return err
  274. }
  275. data, _ := ioutil.ReadAll(resp.Body)
  276. resp.Body.Close()
  277. if resp.StatusCode != 200 {
  278. return fmt.Errorf("Rescan %q: status code %d: %s", folder, resp.StatusCode, data)
  279. }
  280. return nil
  281. }
  282. func (p *syncthingProcess) rescanNext(folder string, next time.Duration) error {
  283. resp, err := p.post("/rest/db/scan?folder="+folder+"&next="+strconv.Itoa(int(next.Seconds())), nil)
  284. if err != nil {
  285. return err
  286. }
  287. data, _ := ioutil.ReadAll(resp.Body)
  288. resp.Body.Close()
  289. if resp.StatusCode != 200 {
  290. return fmt.Errorf("Rescan %q: status code %d: %s", folder, resp.StatusCode, data)
  291. }
  292. return nil
  293. }
  294. func (p *syncthingProcess) reset(folder string) error {
  295. resp, err := p.post("/rest/system/reset?folder="+folder, nil)
  296. if err != nil {
  297. return err
  298. }
  299. data, _ := ioutil.ReadAll(resp.Body)
  300. resp.Body.Close()
  301. if resp.StatusCode != 200 {
  302. return fmt.Errorf("Reset %q: status code %d: %s", folder, resp.StatusCode, data)
  303. }
  304. return nil
  305. }
  306. func awaitCompletion(folder string, ps ...syncthingProcess) error {
  307. mainLoop:
  308. for {
  309. time.Sleep(2500 * time.Millisecond)
  310. expectedVersion := 0
  311. for _, p := range ps {
  312. insync, version, err := p.insync(folder)
  313. if err != nil {
  314. if isTimeout(err) {
  315. continue mainLoop
  316. }
  317. return err
  318. }
  319. if !insync {
  320. continue mainLoop
  321. }
  322. if expectedVersion == 0 {
  323. expectedVersion = version
  324. } else if version != expectedVersion {
  325. // Version number mismatch between devices, so not in sync.
  326. continue mainLoop
  327. }
  328. }
  329. return nil
  330. }
  331. }
  332. func waitForScan(p syncthingProcess) {
  333. // Wait for one scan to succeed, or up to 20 seconds...
  334. for i := 0; i < 20; i++ {
  335. err := p.rescan("default")
  336. if err != nil {
  337. time.Sleep(time.Second)
  338. continue
  339. }
  340. break
  341. }
  342. }