indexhandler_test.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. // Copyright (C) 2024 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model_test
  7. import (
  8. "context"
  9. "fmt"
  10. "io"
  11. "sync"
  12. "testing"
  13. "github.com/syncthing/syncthing/lib/model"
  14. "github.com/syncthing/syncthing/lib/model/mocks"
  15. "github.com/syncthing/syncthing/lib/protocol"
  16. protomock "github.com/syncthing/syncthing/lib/protocol/mocks"
  17. "github.com/syncthing/syncthing/lib/testutil"
  18. )
  19. func TestIndexhandlerConcurrency(t *testing.T) {
  20. // Verify that sending a lot of index update messages using the
  21. // FileInfoBatch works and doesn't trigger the race detector.
  22. ctx, cancel := context.WithCancel(context.Background())
  23. defer cancel()
  24. ar, aw := io.Pipe()
  25. br, bw := io.Pipe()
  26. ci := &protomock.ConnectionInfo{}
  27. m1 := &mocks.Model{}
  28. c1 := protocol.NewConnection(protocol.EmptyDeviceID, ar, bw, testutil.NoopCloser{}, m1, ci, protocol.CompressionNever, nil)
  29. c1.Start()
  30. defer c1.Close(io.EOF)
  31. m2 := &mocks.Model{}
  32. c2 := protocol.NewConnection(protocol.EmptyDeviceID, br, aw, testutil.NoopCloser{}, m2, ci, protocol.CompressionNever, nil)
  33. c2.Start()
  34. defer c2.Close(io.EOF)
  35. c1.ClusterConfig(&protocol.ClusterConfig{}, nil)
  36. c2.ClusterConfig(&protocol.ClusterConfig{}, nil)
  37. c1.Index(ctx, &protocol.Index{Folder: "foo"})
  38. c2.Index(ctx, &protocol.Index{Folder: "foo"})
  39. const msgs = 5e2
  40. const files = 1e3
  41. recvdEntries := 0
  42. recvdBatches := 0
  43. var wg sync.WaitGroup
  44. m2.IndexUpdateCalls(func(_ protocol.Connection, idxUp *protocol.IndexUpdate) error {
  45. for j := 0; j < files; j++ {
  46. if n := idxUp.Files[j].Name; n != fmt.Sprintf("f%d-%d", recvdBatches, j) {
  47. t.Error("wrong filename", n)
  48. }
  49. recvdEntries++
  50. }
  51. recvdBatches++
  52. wg.Done()
  53. return nil
  54. })
  55. b1 := model.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
  56. return c1.IndexUpdate(ctx, &protocol.IndexUpdate{Folder: "foo", Files: fs})
  57. })
  58. sentEntries := 0
  59. for i := 0; i < msgs; i++ {
  60. for j := 0; j < files; j++ {
  61. b1.Append(protocol.FileInfo{
  62. Name: fmt.Sprintf("f%d-%d", i, j),
  63. Blocks: []protocol.BlockInfo{{Hash: make([]byte, 32)}},
  64. })
  65. sentEntries++
  66. }
  67. wg.Add(1)
  68. if err := b1.Flush(); err != nil {
  69. t.Fatal(err)
  70. }
  71. }
  72. // Every sent IndexUpdate should be matched by a corresponding index
  73. // message on the other side. Use the waitgroup to wait for this to
  74. // complete, as otherwise the Close below can race with the last
  75. // outgoing index message and the count between sent and received is
  76. // wrong.
  77. wg.Wait()
  78. c1.Close(io.EOF)
  79. c2.Close(io.EOF)
  80. <-c1.Closed()
  81. <-c2.Closed()
  82. if recvdEntries != sentEntries {
  83. t.Error("didn't receive all expected messages", recvdEntries, sentEntries)
  84. }
  85. }