syncthingprocess.go 7.6 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. // +build integration
  7. package integration
  8. import (
  9. "bufio"
  10. "bytes"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "io/ioutil"
  16. "log"
  17. "net/http"
  18. "os"
  19. "os/exec"
  20. "strconv"
  21. "time"
  22. "github.com/syncthing/protocol"
  23. )
  24. var env = []string{
  25. "HOME=.",
  26. "STGUIAPIKEY=" + apiKey,
  27. "STNORESTART=1",
  28. }
  29. type syncthingProcess struct {
  30. instance string
  31. argv []string
  32. port int
  33. apiKey string
  34. csrfToken string
  35. lastEvent int
  36. id protocol.DeviceID
  37. cmd *exec.Cmd
  38. logfd *os.File
  39. }
  40. func (p *syncthingProcess) start() error {
  41. if p.logfd == nil {
  42. logfd, err := os.Create("logs/" + getTestName() + "-" + p.instance + ".out")
  43. if err != nil {
  44. return err
  45. }
  46. p.logfd = logfd
  47. }
  48. binary := "../bin/syncthing"
  49. // We check to see if there's an instance specific binary we should run,
  50. // for example if we are running integration tests between different
  51. // versions. If there isn't, we just go with the default.
  52. if _, err := os.Stat(binary + "-" + p.instance); err == nil {
  53. binary = binary + "-" + p.instance
  54. }
  55. if _, err := os.Stat(binary + "-" + p.instance + ".exe"); err == nil {
  56. binary = binary + "-" + p.instance + ".exe"
  57. }
  58. cmd := exec.Command(binary, p.argv...)
  59. cmd.Stdout = p.logfd
  60. cmd.Stderr = p.logfd
  61. cmd.Env = append(os.Environ(), env...)
  62. err := cmd.Start()
  63. if err != nil {
  64. return err
  65. }
  66. p.cmd = cmd
  67. for {
  68. time.Sleep(250 * time.Millisecond)
  69. resp, err := p.get("/rest/system/status")
  70. if err != nil {
  71. continue
  72. }
  73. var sysData map[string]interface{}
  74. err = json.NewDecoder(resp.Body).Decode(&sysData)
  75. resp.Body.Close()
  76. if err != nil {
  77. // This one is unexpected. Print it.
  78. log.Println("/rest/system/status (JSON):", err)
  79. continue
  80. }
  81. id, err := protocol.DeviceIDFromString(sysData["myID"].(string))
  82. if err != nil {
  83. // This one is unexpected. Print it.
  84. log.Println("/rest/system/status (myID):", err)
  85. continue
  86. }
  87. p.id = id
  88. return nil
  89. }
  90. }
  91. func (p *syncthingProcess) stop() (*os.ProcessState, error) {
  92. p.cmd.Process.Signal(os.Kill)
  93. p.cmd.Wait()
  94. fd, err := os.Open(p.logfd.Name())
  95. if err != nil {
  96. return p.cmd.ProcessState, err
  97. }
  98. defer fd.Close()
  99. raceConditionStart := []byte("WARNING: DATA RACE")
  100. raceConditionSep := []byte("==================")
  101. panicConditionStart := []byte("panic:")
  102. panicConditionSep := []byte(p.id.String()[:5])
  103. sc := bufio.NewScanner(fd)
  104. race := false
  105. _panic := false
  106. for sc.Scan() {
  107. line := sc.Bytes()
  108. if race || _panic {
  109. if bytes.Contains(line, panicConditionSep) {
  110. _panic = false
  111. continue
  112. }
  113. fmt.Printf("%s\n", line)
  114. if bytes.Contains(line, raceConditionSep) {
  115. race = false
  116. }
  117. } else if bytes.Contains(line, raceConditionStart) {
  118. fmt.Printf("%s\n", raceConditionSep)
  119. fmt.Printf("%s\n", raceConditionStart)
  120. race = true
  121. if err == nil {
  122. err = errors.New("Race condition detected")
  123. }
  124. } else if bytes.Contains(line, panicConditionStart) {
  125. _panic = true
  126. if err == nil {
  127. err = errors.New("Panic detected")
  128. }
  129. }
  130. }
  131. return p.cmd.ProcessState, err
  132. }
  133. func (p *syncthingProcess) get(path string) (*http.Response, error) {
  134. client := &http.Client{
  135. Timeout: 30 * time.Second,
  136. Transport: &http.Transport{
  137. DisableKeepAlives: true,
  138. },
  139. }
  140. req, err := http.NewRequest("GET", fmt.Sprintf("http://127.0.0.1:%d%s", p.port, path), nil)
  141. if err != nil {
  142. return nil, err
  143. }
  144. if p.apiKey != "" {
  145. req.Header.Add("X-API-Key", p.apiKey)
  146. }
  147. if p.csrfToken != "" {
  148. req.Header.Add("X-CSRF-Token", p.csrfToken)
  149. }
  150. resp, err := client.Do(req)
  151. if err != nil {
  152. return nil, err
  153. }
  154. return resp, nil
  155. }
  156. func (p *syncthingProcess) post(path string, data io.Reader) (*http.Response, error) {
  157. client := &http.Client{
  158. Timeout: 600 * time.Second,
  159. Transport: &http.Transport{
  160. DisableKeepAlives: true,
  161. },
  162. }
  163. req, err := http.NewRequest("POST", fmt.Sprintf("http://127.0.0.1:%d%s", p.port, path), data)
  164. if err != nil {
  165. return nil, err
  166. }
  167. if p.apiKey != "" {
  168. req.Header.Add("X-API-Key", p.apiKey)
  169. }
  170. if p.csrfToken != "" {
  171. req.Header.Add("X-CSRF-Token", p.csrfToken)
  172. }
  173. req.Header.Add("Content-Type", "application/json")
  174. resp, err := client.Do(req)
  175. if err != nil {
  176. return nil, err
  177. }
  178. return resp, nil
  179. }
  180. func (p *syncthingProcess) peerCompletion() (map[string]int, error) {
  181. resp, err := p.get("/rest/debug/peerCompletion")
  182. if err != nil {
  183. return nil, err
  184. }
  185. defer resp.Body.Close()
  186. comp := map[string]int{}
  187. err = json.NewDecoder(resp.Body).Decode(&comp)
  188. // Remove ourselves from the set. In the remaining map, all peers should
  189. // be att 100% if we're in sync.
  190. for id := range comp {
  191. if id == p.id.String() {
  192. delete(comp, id)
  193. }
  194. }
  195. return comp, err
  196. }
  197. func (p *syncthingProcess) allPeersInSync() error {
  198. comp, err := p.peerCompletion()
  199. if err != nil {
  200. return err
  201. }
  202. for id, val := range comp {
  203. if val != 100 {
  204. return fmt.Errorf("%.7s at %d%%", id, val)
  205. }
  206. }
  207. return nil
  208. }
  209. type model struct {
  210. GlobalBytes int
  211. GlobalDeleted int
  212. GlobalFiles int
  213. InSyncBytes int
  214. InSyncFiles int
  215. Invalid string
  216. LocalBytes int
  217. LocalDeleted int
  218. LocalFiles int
  219. NeedBytes int
  220. NeedFiles int
  221. State string
  222. StateChanged time.Time
  223. Version int
  224. }
  225. func (p *syncthingProcess) model(folder string) (model, error) {
  226. resp, err := p.get("/rest/db/status?folder=" + folder)
  227. if err != nil {
  228. return model{}, err
  229. }
  230. var res model
  231. err = json.NewDecoder(resp.Body).Decode(&res)
  232. if err != nil {
  233. return model{}, err
  234. }
  235. return res, nil
  236. }
  237. type event struct {
  238. ID int
  239. Time time.Time
  240. Type string
  241. Data interface{}
  242. }
  243. func (p *syncthingProcess) events() ([]event, error) {
  244. resp, err := p.get(fmt.Sprintf("/rest/events?since=%d", p.lastEvent))
  245. if err != nil {
  246. return nil, err
  247. }
  248. defer resp.Body.Close()
  249. var evs []event
  250. err = json.NewDecoder(resp.Body).Decode(&evs)
  251. if err != nil {
  252. return nil, err
  253. }
  254. p.lastEvent = evs[len(evs)-1].ID
  255. return evs, err
  256. }
  257. type versionResp struct {
  258. Version string
  259. }
  260. func (p *syncthingProcess) version() (string, error) {
  261. resp, err := p.get("/rest/system/version")
  262. if err != nil {
  263. return "", err
  264. }
  265. defer resp.Body.Close()
  266. var v versionResp
  267. err = json.NewDecoder(resp.Body).Decode(&v)
  268. if err != nil {
  269. return "", err
  270. }
  271. return v.Version, nil
  272. }
  273. func (p *syncthingProcess) rescan(folder string) error {
  274. resp, err := p.post("/rest/db/scan?folder="+folder, nil)
  275. if err != nil {
  276. return err
  277. }
  278. data, _ := ioutil.ReadAll(resp.Body)
  279. resp.Body.Close()
  280. if resp.StatusCode != 200 {
  281. return fmt.Errorf("Rescan %q: status code %d: %s", folder, resp.StatusCode, data)
  282. }
  283. return nil
  284. }
  285. func (p *syncthingProcess) rescanNext(folder string, next time.Duration) error {
  286. resp, err := p.post("/rest/db/scan?folder="+folder+"&next="+strconv.Itoa(int(next.Seconds())), nil)
  287. if err != nil {
  288. return err
  289. }
  290. data, _ := ioutil.ReadAll(resp.Body)
  291. resp.Body.Close()
  292. if resp.StatusCode != 200 {
  293. return fmt.Errorf("Rescan %q: status code %d: %s", folder, resp.StatusCode, data)
  294. }
  295. return nil
  296. }
  297. func (p *syncthingProcess) reset(folder string) error {
  298. resp, err := p.post("/rest/system/reset?folder="+folder, nil)
  299. if err != nil {
  300. return err
  301. }
  302. data, _ := ioutil.ReadAll(resp.Body)
  303. resp.Body.Close()
  304. if resp.StatusCode != 200 {
  305. return fmt.Errorf("Reset %q: status code %d: %s", folder, resp.StatusCode, data)
  306. }
  307. return nil
  308. }
  309. func allDevicesInSync(p []syncthingProcess) error {
  310. for _, device := range p {
  311. if err := device.allPeersInSync(); err != nil {
  312. return fmt.Errorf("%.7s: %v", device.id.String(), err)
  313. }
  314. }
  315. return nil
  316. }