protocol_test.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. // Copyright (C) 2014 The Protocol Authors.
  2. package protocol
  3. import (
  4. "bytes"
  5. "encoding/binary"
  6. "encoding/hex"
  7. "encoding/json"
  8. "errors"
  9. "flag"
  10. "fmt"
  11. "io"
  12. "io/ioutil"
  13. "os"
  14. "reflect"
  15. "strings"
  16. "testing"
  17. "testing/quick"
  18. "time"
  19. "github.com/calmh/xdr"
  20. )
  21. var (
  22. c0ID = NewDeviceID([]byte{1})
  23. c1ID = NewDeviceID([]byte{2})
  24. quickCfg = &quick.Config{}
  25. )
  26. func TestMain(m *testing.M) {
  27. flag.Parse()
  28. if flag.Lookup("test.short").Value.String() != "false" {
  29. quickCfg.MaxCount = 10
  30. }
  31. os.Exit(m.Run())
  32. }
  33. func TestHeaderEncodeDecode(t *testing.T) {
  34. f := func(ver, id, typ int) bool {
  35. ver = int(uint(ver) % 16)
  36. id = int(uint(id) % 4096)
  37. typ = int(uint(typ) % 256)
  38. h0 := header{version: ver, msgID: id, msgType: typ}
  39. h1 := decodeHeader(encodeHeader(h0))
  40. return h0 == h1
  41. }
  42. if err := quick.Check(f, nil); err != nil {
  43. t.Error(err)
  44. }
  45. }
  46. func TestHeaderMarshalUnmarshal(t *testing.T) {
  47. f := func(ver, id, typ int) bool {
  48. ver = int(uint(ver) % 16)
  49. id = int(uint(id) % 4096)
  50. typ = int(uint(typ) % 256)
  51. buf := new(bytes.Buffer)
  52. xw := xdr.NewWriter(buf)
  53. h0 := header{version: ver, msgID: id, msgType: typ}
  54. h0.encodeXDR(xw)
  55. xr := xdr.NewReader(buf)
  56. var h1 header
  57. h1.decodeXDR(xr)
  58. return h0 == h1
  59. }
  60. if err := quick.Check(f, nil); err != nil {
  61. t.Error(err)
  62. }
  63. }
  64. func TestHeaderLayout(t *testing.T) {
  65. var e, a uint32
  66. // Version are the first four bits
  67. e = 0xf0000000
  68. a = encodeHeader(header{version: 0xf})
  69. if a != e {
  70. t.Errorf("Header layout incorrect; %08x != %08x", a, e)
  71. }
  72. // Message ID are the following 12 bits
  73. e = 0x0fff0000
  74. a = encodeHeader(header{msgID: 0xfff})
  75. if a != e {
  76. t.Errorf("Header layout incorrect; %08x != %08x", a, e)
  77. }
  78. // Type are the last 8 bits before reserved
  79. e = 0x0000ff00
  80. a = encodeHeader(header{msgType: 0xff})
  81. if a != e {
  82. t.Errorf("Header layout incorrect; %08x != %08x", a, e)
  83. }
  84. }
  85. func TestPing(t *testing.T) {
  86. ar, aw := io.Pipe()
  87. br, bw := io.Pipe()
  88. c0 := NewConnection(c0ID, ar, bw, newTestModel(), "name", CompressAlways).(wireFormatConnection).Connection.(*rawConnection)
  89. c0.Start()
  90. c1 := NewConnection(c1ID, br, aw, newTestModel(), "name", CompressAlways).(wireFormatConnection).Connection.(*rawConnection)
  91. c1.Start()
  92. c0.ClusterConfig(ClusterConfigMessage{})
  93. c1.ClusterConfig(ClusterConfigMessage{})
  94. if ok := c0.ping(); !ok {
  95. t.Error("c0 ping failed")
  96. }
  97. if ok := c1.ping(); !ok {
  98. t.Error("c1 ping failed")
  99. }
  100. }
  101. func TestVersionErr(t *testing.T) {
  102. m0 := newTestModel()
  103. m1 := newTestModel()
  104. ar, aw := io.Pipe()
  105. br, bw := io.Pipe()
  106. c0 := NewConnection(c0ID, ar, bw, m0, "name", CompressAlways).(wireFormatConnection).Connection.(*rawConnection)
  107. c0.Start()
  108. c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways)
  109. c1.Start()
  110. c0.ClusterConfig(ClusterConfigMessage{})
  111. c1.ClusterConfig(ClusterConfigMessage{})
  112. w := xdr.NewWriter(c0.cw)
  113. timeoutWriteHeader(w, header{
  114. version: 2, // higher than supported
  115. msgID: 0,
  116. msgType: messageTypeIndex,
  117. })
  118. if err := m1.closedError(); err == nil || !strings.Contains(err.Error(), "unknown protocol version") {
  119. t.Error("Connection should close due to unknown version, not", err)
  120. }
  121. }
  122. func TestTypeErr(t *testing.T) {
  123. m0 := newTestModel()
  124. m1 := newTestModel()
  125. ar, aw := io.Pipe()
  126. br, bw := io.Pipe()
  127. c0 := NewConnection(c0ID, ar, bw, m0, "name", CompressAlways).(wireFormatConnection).Connection.(*rawConnection)
  128. c0.Start()
  129. c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways)
  130. c1.Start()
  131. c0.ClusterConfig(ClusterConfigMessage{})
  132. c1.ClusterConfig(ClusterConfigMessage{})
  133. w := xdr.NewWriter(c0.cw)
  134. timeoutWriteHeader(w, header{
  135. version: 0,
  136. msgID: 0,
  137. msgType: 42, // unknown type
  138. })
  139. if err := m1.closedError(); err == nil || !strings.Contains(err.Error(), "unknown message type") {
  140. t.Error("Connection should close due to unknown message type, not", err)
  141. }
  142. }
  143. func TestClose(t *testing.T) {
  144. m0 := newTestModel()
  145. m1 := newTestModel()
  146. ar, aw := io.Pipe()
  147. br, bw := io.Pipe()
  148. c0 := NewConnection(c0ID, ar, bw, m0, "name", CompressAlways).(wireFormatConnection).Connection.(*rawConnection)
  149. c0.Start()
  150. c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways)
  151. c1.Start()
  152. c0.ClusterConfig(ClusterConfigMessage{})
  153. c1.ClusterConfig(ClusterConfigMessage{})
  154. c0.close(errors.New("manual close"))
  155. <-c0.closed
  156. if err := m0.closedError(); err == nil || !strings.Contains(err.Error(), "manual close") {
  157. t.Fatal("Connection should be closed")
  158. }
  159. // None of these should panic, some should return an error
  160. if c0.ping() {
  161. t.Error("Ping should not return true")
  162. }
  163. c0.Index("default", nil, 0, nil)
  164. c0.Index("default", nil, 0, nil)
  165. if _, err := c0.Request("default", "foo", 0, 0, nil, 0, nil); err == nil {
  166. t.Error("Request should return an error")
  167. }
  168. }
  169. func TestElementSizeExceededNested(t *testing.T) {
  170. m := ClusterConfigMessage{
  171. ClientName: "longstringlongstringlongstringinglongstringlongstringlonlongstringlongstringlon",
  172. }
  173. _, err := m.EncodeXDR(ioutil.Discard)
  174. if err == nil {
  175. t.Errorf("ID length %d > max 64, but no error", len(m.Folders[0].ID))
  176. }
  177. }
  178. func TestMarshalIndexMessage(t *testing.T) {
  179. f := func(m1 IndexMessage) bool {
  180. for i, f := range m1.Files {
  181. m1.Files[i].CachedSize = 0
  182. for j := range f.Blocks {
  183. f.Blocks[j].Offset = 0
  184. if len(f.Blocks[j].Hash) == 0 {
  185. f.Blocks[j].Hash = nil
  186. }
  187. }
  188. }
  189. return testMarshal(t, "index", &m1, &IndexMessage{})
  190. }
  191. if err := quick.Check(f, quickCfg); err != nil {
  192. t.Error(err)
  193. }
  194. }
  195. func TestMarshalRequestMessage(t *testing.T) {
  196. f := func(m1 RequestMessage) bool {
  197. return testMarshal(t, "request", &m1, &RequestMessage{})
  198. }
  199. if err := quick.Check(f, quickCfg); err != nil {
  200. t.Error(err)
  201. }
  202. }
  203. func TestMarshalResponseMessage(t *testing.T) {
  204. f := func(m1 ResponseMessage) bool {
  205. if len(m1.Data) == 0 {
  206. m1.Data = nil
  207. }
  208. return testMarshal(t, "response", &m1, &ResponseMessage{})
  209. }
  210. if err := quick.Check(f, quickCfg); err != nil {
  211. t.Error(err)
  212. }
  213. }
  214. func TestMarshalClusterConfigMessage(t *testing.T) {
  215. f := func(m1 ClusterConfigMessage) bool {
  216. return testMarshal(t, "clusterconfig", &m1, &ClusterConfigMessage{})
  217. }
  218. if err := quick.Check(f, quickCfg); err != nil {
  219. t.Error(err)
  220. }
  221. }
  222. func TestMarshalCloseMessage(t *testing.T) {
  223. f := func(m1 CloseMessage) bool {
  224. return testMarshal(t, "close", &m1, &CloseMessage{})
  225. }
  226. if err := quick.Check(f, quickCfg); err != nil {
  227. t.Error(err)
  228. }
  229. }
  230. type message interface {
  231. EncodeXDR(io.Writer) (int, error)
  232. DecodeXDR(io.Reader) error
  233. }
  234. func testMarshal(t *testing.T, prefix string, m1, m2 message) bool {
  235. var buf bytes.Buffer
  236. failed := func(bc []byte) {
  237. bs, _ := json.MarshalIndent(m1, "", " ")
  238. ioutil.WriteFile(prefix+"-1.txt", bs, 0644)
  239. bs, _ = json.MarshalIndent(m2, "", " ")
  240. ioutil.WriteFile(prefix+"-2.txt", bs, 0644)
  241. if len(bc) > 0 {
  242. f, _ := os.Create(prefix + "-data.txt")
  243. fmt.Fprint(f, hex.Dump(bc))
  244. f.Close()
  245. }
  246. }
  247. _, err := m1.EncodeXDR(&buf)
  248. if err != nil && strings.Contains(err.Error(), "exceeds size") {
  249. return true
  250. }
  251. if err != nil {
  252. failed(nil)
  253. t.Fatal(err)
  254. }
  255. bc := make([]byte, len(buf.Bytes()))
  256. copy(bc, buf.Bytes())
  257. err = m2.DecodeXDR(&buf)
  258. if err != nil {
  259. failed(bc)
  260. t.Fatal(err)
  261. }
  262. ok := reflect.DeepEqual(m1, m2)
  263. if !ok {
  264. failed(bc)
  265. }
  266. return ok
  267. }
  268. func timeoutWriteHeader(w *xdr.Writer, hdr header) {
  269. // This tries to write a message header to w, but times out after a while.
  270. // This is useful because in testing, with a PipeWriter, it will block
  271. // forever if the other side isn't reading any more. On the other hand we
  272. // can't just "go" it into the background, because if the other side is
  273. // still there we should wait for the write to complete. Yay.
  274. var buf [8]byte // header and message length
  275. binary.BigEndian.PutUint32(buf[:], encodeHeader(hdr))
  276. binary.BigEndian.PutUint32(buf[4:], 0) // zero message length, explicitly
  277. done := make(chan struct{})
  278. go func() {
  279. w.WriteRaw(buf[:])
  280. l.Infoln("write completed")
  281. close(done)
  282. }()
  283. select {
  284. case <-done:
  285. case <-time.After(250 * time.Millisecond):
  286. }
  287. }
  288. func TestFileInfoSize(t *testing.T) {
  289. fi := FileInfo{
  290. Blocks: []BlockInfo{
  291. {Size: 42},
  292. {Offset: 42, Size: 23},
  293. {Offset: 42 + 23, Size: 34},
  294. },
  295. }
  296. size := fi.Size()
  297. want := int64(42 + 23 + 34)
  298. if size != want {
  299. t.Errorf("Incorrect size reported, got %d, want %d", size, want)
  300. }
  301. size = fi.Size() // Cached, this time
  302. if size != want {
  303. t.Errorf("Incorrect cached size reported, got %d, want %d", size, want)
  304. }
  305. fi.CachedSize = 8
  306. want = 8
  307. size = fi.Size() // Ensure it came from the cache
  308. if size != want {
  309. t.Errorf("Incorrect cached size reported, got %d, want %d", size, want)
  310. }
  311. fi.CachedSize = 0
  312. fi.Flags = FlagDirectory
  313. want = 128
  314. size = fi.Size() // Directories are 128 bytes large
  315. if size != want {
  316. t.Errorf("Incorrect cached size reported, got %d, want %d", size, want)
  317. }
  318. fi.CachedSize = 0
  319. fi.Flags = FlagDeleted
  320. want = 128
  321. size = fi.Size() // Also deleted files
  322. if size != want {
  323. t.Errorf("Incorrect cached size reported, got %d, want %d", size, want)
  324. }
  325. }