replication.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. // Copyright (C) 2018 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package main
  7. import (
  8. "crypto/tls"
  9. "encoding/binary"
  10. "fmt"
  11. io "io"
  12. "log"
  13. "net"
  14. "time"
  15. "github.com/syncthing/syncthing/lib/protocol"
  16. )
  17. const replicationReadTimeout = time.Minute
  18. const replicationHeartbeatInterval = time.Second * 30
  19. type replicator interface {
  20. send(key string, addrs []DatabaseAddress, seen int64)
  21. }
  22. // a replicationSender tries to connect to the remote address and provide
  23. // them with a feed of replication updates.
  24. type replicationSender struct {
  25. dst string
  26. cert tls.Certificate // our certificate
  27. allowedIDs []protocol.DeviceID
  28. outbox chan ReplicationRecord
  29. stop chan struct{}
  30. }
  31. func newReplicationSender(dst string, cert tls.Certificate, allowedIDs []protocol.DeviceID) *replicationSender {
  32. return &replicationSender{
  33. dst: dst,
  34. cert: cert,
  35. allowedIDs: allowedIDs,
  36. outbox: make(chan ReplicationRecord, replicationOutboxSize),
  37. stop: make(chan struct{}),
  38. }
  39. }
  40. func (s *replicationSender) Serve() {
  41. // Sleep a little at startup. Peers often restart at the same time, and
  42. // this avoid the service failing and entering backoff state
  43. // unnecessarily, while also reducing the reconnect rate to something
  44. // reasonable by default.
  45. time.Sleep(2 * time.Second)
  46. tlsCfg := &tls.Config{
  47. Certificates: []tls.Certificate{s.cert},
  48. MinVersion: tls.VersionTLS12,
  49. InsecureSkipVerify: true,
  50. }
  51. // Dial the TLS connection.
  52. conn, err := tls.Dial("tcp", s.dst, tlsCfg)
  53. if err != nil {
  54. log.Println("Replication connect:", err)
  55. return
  56. }
  57. defer func() {
  58. conn.SetWriteDeadline(time.Now().Add(time.Second))
  59. conn.Close()
  60. }()
  61. // Get the other side device ID.
  62. remoteID, err := deviceID(conn)
  63. if err != nil {
  64. log.Println("Replication connect:", err)
  65. return
  66. }
  67. // Verify it's in the set of allowed device IDs.
  68. if !deviceIDIn(remoteID, s.allowedIDs) {
  69. log.Println("Replication connect: unexpected device ID:", remoteID)
  70. return
  71. }
  72. heartBeatTicker := time.NewTicker(replicationHeartbeatInterval)
  73. defer heartBeatTicker.Stop()
  74. // Send records.
  75. buf := make([]byte, 1024)
  76. for {
  77. select {
  78. case <-heartBeatTicker.C:
  79. if len(s.outbox) > 0 {
  80. // No need to send heartbeats if there are events/prevrious
  81. // heartbeats to send, they will keep the connection alive.
  82. continue
  83. }
  84. // Empty replication message is the heartbeat:
  85. s.outbox <- ReplicationRecord{}
  86. case rec := <-s.outbox:
  87. // Buffer must hold record plus four bytes for size
  88. size := rec.Size()
  89. if len(buf) < size+4 {
  90. buf = make([]byte, size+4)
  91. }
  92. // Record comes after the four bytes size
  93. n, err := rec.MarshalTo(buf[4:])
  94. if err != nil {
  95. // odd to get an error here, but we haven't sent anything
  96. // yet so it's not fatal
  97. replicationSendsTotal.WithLabelValues("error").Inc()
  98. log.Println("Replication marshal:", err)
  99. continue
  100. }
  101. binary.BigEndian.PutUint32(buf, uint32(n))
  102. // Send
  103. conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
  104. if _, err := conn.Write(buf[:4+n]); err != nil {
  105. replicationSendsTotal.WithLabelValues("error").Inc()
  106. log.Println("Replication write:", err)
  107. // Yes, we are loosing the replication event here.
  108. return
  109. }
  110. replicationSendsTotal.WithLabelValues("success").Inc()
  111. case <-s.stop:
  112. return
  113. }
  114. }
  115. }
  116. func (s *replicationSender) Stop() {
  117. close(s.stop)
  118. }
  119. func (s *replicationSender) String() string {
  120. return fmt.Sprintf("replicationSender(%q)", s.dst)
  121. }
  122. func (s *replicationSender) send(key string, ps []DatabaseAddress, seen int64) {
  123. item := ReplicationRecord{
  124. Key: key,
  125. Addresses: ps,
  126. }
  127. // The send should never block. The inbox is suitably buffered for at
  128. // least a few seconds of stalls, which shouldn't happen in practice.
  129. select {
  130. case s.outbox <- item:
  131. default:
  132. replicationSendsTotal.WithLabelValues("drop").Inc()
  133. }
  134. }
  135. // a replicationMultiplexer sends to multiple replicators
  136. type replicationMultiplexer []replicator
  137. func (m replicationMultiplexer) send(key string, ps []DatabaseAddress, seen int64) {
  138. for _, s := range m {
  139. // each send is nonblocking
  140. s.send(key, ps, seen)
  141. }
  142. }
  143. // replicationListener accepts incoming connections and reads replication
  144. // items from them. Incoming items are applied to the KV store.
  145. type replicationListener struct {
  146. addr string
  147. cert tls.Certificate
  148. allowedIDs []protocol.DeviceID
  149. db database
  150. stop chan struct{}
  151. }
  152. func newReplicationListener(addr string, cert tls.Certificate, allowedIDs []protocol.DeviceID, db database) *replicationListener {
  153. return &replicationListener{
  154. addr: addr,
  155. cert: cert,
  156. allowedIDs: allowedIDs,
  157. db: db,
  158. stop: make(chan struct{}),
  159. }
  160. }
  161. func (l *replicationListener) Serve() {
  162. tlsCfg := &tls.Config{
  163. Certificates: []tls.Certificate{l.cert},
  164. ClientAuth: tls.RequestClientCert,
  165. MinVersion: tls.VersionTLS12,
  166. InsecureSkipVerify: true,
  167. }
  168. lst, err := tls.Listen("tcp", l.addr, tlsCfg)
  169. if err != nil {
  170. log.Println("Replication listen:", err)
  171. return
  172. }
  173. defer lst.Close()
  174. for {
  175. select {
  176. case <-l.stop:
  177. return
  178. default:
  179. }
  180. // Accept a connection
  181. conn, err := lst.Accept()
  182. if err != nil {
  183. log.Println("Replication accept:", err)
  184. return
  185. }
  186. // Figure out the other side device ID
  187. remoteID, err := deviceID(conn.(*tls.Conn))
  188. if err != nil {
  189. log.Println("Replication accept:", err)
  190. conn.SetWriteDeadline(time.Now().Add(time.Second))
  191. conn.Close()
  192. continue
  193. }
  194. // Verify it is in the set of allowed device IDs
  195. if !deviceIDIn(remoteID, l.allowedIDs) {
  196. log.Println("Replication accept: unexpected device ID:", remoteID)
  197. conn.SetWriteDeadline(time.Now().Add(time.Second))
  198. conn.Close()
  199. continue
  200. }
  201. go l.handle(conn)
  202. }
  203. }
  204. func (l *replicationListener) Stop() {
  205. close(l.stop)
  206. }
  207. func (l *replicationListener) String() string {
  208. return fmt.Sprintf("replicationListener(%q)", l.addr)
  209. }
  210. func (l *replicationListener) handle(conn net.Conn) {
  211. defer func() {
  212. conn.SetWriteDeadline(time.Now().Add(time.Second))
  213. conn.Close()
  214. }()
  215. buf := make([]byte, 1024)
  216. for {
  217. select {
  218. case <-l.stop:
  219. return
  220. default:
  221. }
  222. conn.SetReadDeadline(time.Now().Add(replicationReadTimeout))
  223. // First four bytes are the size
  224. if _, err := io.ReadFull(conn, buf[:4]); err != nil {
  225. log.Println("Replication read size:", err)
  226. replicationRecvsTotal.WithLabelValues("error").Inc()
  227. return
  228. }
  229. // Read the rest of the record
  230. size := int(binary.BigEndian.Uint32(buf[:4]))
  231. if len(buf) < size {
  232. buf = make([]byte, size)
  233. }
  234. if size == 0 {
  235. // Heartbeat, ignore
  236. continue
  237. }
  238. if _, err := io.ReadFull(conn, buf[:size]); err != nil {
  239. log.Println("Replication read record:", err)
  240. replicationRecvsTotal.WithLabelValues("error").Inc()
  241. return
  242. }
  243. // Unmarshal
  244. var rec ReplicationRecord
  245. if err := rec.Unmarshal(buf[:size]); err != nil {
  246. log.Println("Replication unmarshal:", err)
  247. replicationRecvsTotal.WithLabelValues("error").Inc()
  248. continue
  249. }
  250. // Store
  251. l.db.merge(rec.Key, rec.Addresses, rec.Seen)
  252. replicationRecvsTotal.WithLabelValues("success").Inc()
  253. }
  254. }
  255. func deviceID(conn *tls.Conn) (protocol.DeviceID, error) {
  256. // Handshake may not be complete on the server side yet, which we need
  257. // to get the client certificate.
  258. if !conn.ConnectionState().HandshakeComplete {
  259. if err := conn.Handshake(); err != nil {
  260. return protocol.DeviceID{}, err
  261. }
  262. }
  263. // We expect exactly one certificate.
  264. certs := conn.ConnectionState().PeerCertificates
  265. if len(certs) != 1 {
  266. return protocol.DeviceID{}, fmt.Errorf("unexpected number of certificates (%d != 1)", len(certs))
  267. }
  268. return protocol.NewDeviceID(certs[0].Raw), nil
  269. }
  270. func deviceIDIn(id protocol.DeviceID, ids []protocol.DeviceID) bool {
  271. for _, candidate := range ids {
  272. if id == candidate {
  273. return true
  274. }
  275. }
  276. return false
  277. }