amqp.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Copyright (C) 2024 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package main
  7. import (
  8. "bytes"
  9. "context"
  10. "fmt"
  11. "io"
  12. "log"
  13. amqp "github.com/rabbitmq/amqp091-go"
  14. "github.com/syncthing/syncthing/lib/protocol"
  15. "github.com/thejerf/suture/v4"
  16. )
  17. type amqpReplicator struct {
  18. suture.Service
  19. broker string
  20. sender *amqpSender
  21. receiver *amqpReceiver
  22. outbox chan ReplicationRecord
  23. }
  24. func newAMQPReplicator(broker, clientID string, db database) *amqpReplicator {
  25. svc := suture.New("amqpReplicator", suture.Spec{PassThroughPanics: true})
  26. sender := &amqpSender{
  27. broker: broker,
  28. clientID: clientID,
  29. outbox: make(chan ReplicationRecord, replicationOutboxSize),
  30. }
  31. svc.Add(sender)
  32. receiver := &amqpReceiver{
  33. broker: broker,
  34. clientID: clientID,
  35. db: db,
  36. }
  37. svc.Add(receiver)
  38. return &amqpReplicator{
  39. Service: svc,
  40. broker: broker,
  41. sender: sender,
  42. receiver: receiver,
  43. outbox: make(chan ReplicationRecord, replicationOutboxSize),
  44. }
  45. }
  46. func (s *amqpReplicator) send(key *protocol.DeviceID, ps []DatabaseAddress, seen int64) {
  47. s.sender.send(key, ps, seen)
  48. }
  49. type amqpSender struct {
  50. broker string
  51. clientID string
  52. outbox chan ReplicationRecord
  53. }
  54. func (s *amqpSender) Serve(ctx context.Context) error {
  55. conn, ch, err := amqpChannel(s.broker)
  56. if err != nil {
  57. return err
  58. }
  59. defer ch.Close()
  60. defer conn.Close()
  61. buf := make([]byte, 1024)
  62. for {
  63. select {
  64. case rec := <-s.outbox:
  65. size := rec.Size()
  66. if len(buf) < size {
  67. buf = make([]byte, size)
  68. }
  69. n, err := rec.MarshalTo(buf)
  70. if err != nil {
  71. replicationSendsTotal.WithLabelValues("error").Inc()
  72. return fmt.Errorf("replication marshal: %w", err)
  73. }
  74. err = ch.PublishWithContext(ctx,
  75. "discovery", // exchange
  76. "", // routing key
  77. false, // mandatory
  78. false, // immediate
  79. amqp.Publishing{
  80. ContentType: "application/protobuf",
  81. Body: buf[:n],
  82. AppId: s.clientID,
  83. })
  84. if err != nil {
  85. replicationSendsTotal.WithLabelValues("error").Inc()
  86. return fmt.Errorf("replication publish: %w", err)
  87. }
  88. replicationSendsTotal.WithLabelValues("success").Inc()
  89. case <-ctx.Done():
  90. return nil
  91. }
  92. }
  93. }
  94. func (s *amqpSender) String() string {
  95. return fmt.Sprintf("amqpSender(%q)", s.broker)
  96. }
  97. func (s *amqpSender) send(key *protocol.DeviceID, ps []DatabaseAddress, seen int64) {
  98. item := ReplicationRecord{
  99. Key: key[:],
  100. Addresses: ps,
  101. Seen: seen,
  102. }
  103. // The send should never block. The inbox is suitably buffered for at
  104. // least a few seconds of stalls, which shouldn't happen in practice.
  105. select {
  106. case s.outbox <- item:
  107. default:
  108. replicationSendsTotal.WithLabelValues("drop").Inc()
  109. }
  110. }
  111. type amqpReceiver struct {
  112. broker string
  113. clientID string
  114. db database
  115. }
  116. func (s *amqpReceiver) Serve(ctx context.Context) error {
  117. conn, ch, err := amqpChannel(s.broker)
  118. if err != nil {
  119. return err
  120. }
  121. defer ch.Close()
  122. defer conn.Close()
  123. msgs, err := amqpConsume(ch)
  124. if err != nil {
  125. return err
  126. }
  127. for {
  128. select {
  129. case msg, ok := <-msgs:
  130. if !ok {
  131. return fmt.Errorf("subscription closed: %w", io.EOF)
  132. }
  133. // ignore messages from ourself
  134. if msg.AppId == s.clientID {
  135. continue
  136. }
  137. var rec ReplicationRecord
  138. if err := rec.Unmarshal(msg.Body); err != nil {
  139. replicationRecvsTotal.WithLabelValues("error").Inc()
  140. return fmt.Errorf("replication unmarshal: %w", err)
  141. }
  142. if bytes.Equal(rec.Key, []byte("<heartbeat>")) {
  143. continue
  144. }
  145. id, err := protocol.DeviceIDFromBytes(rec.Key)
  146. if err != nil {
  147. id, err = protocol.DeviceIDFromString(string(rec.Key))
  148. }
  149. if err != nil {
  150. log.Println("Replication device ID:", err)
  151. replicationRecvsTotal.WithLabelValues("error").Inc()
  152. continue
  153. }
  154. if err := s.db.merge(&id, rec.Addresses, rec.Seen); err != nil {
  155. return fmt.Errorf("replication database merge: %w", err)
  156. }
  157. replicationRecvsTotal.WithLabelValues("success").Inc()
  158. case <-ctx.Done():
  159. return nil
  160. }
  161. }
  162. }
  163. func (s *amqpReceiver) String() string {
  164. return fmt.Sprintf("amqpReceiver(%q)", s.broker)
  165. }
  166. func amqpChannel(dst string) (*amqp.Connection, *amqp.Channel, error) {
  167. conn, err := amqp.Dial(dst)
  168. if err != nil {
  169. return nil, nil, fmt.Errorf("AMQP dial: %w", err)
  170. }
  171. ch, err := conn.Channel()
  172. if err != nil {
  173. return nil, nil, fmt.Errorf("AMQP channel: %w", err)
  174. }
  175. err = ch.ExchangeDeclare(
  176. "discovery", // name
  177. "fanout", // type
  178. false, // durable
  179. false, // auto-deleted
  180. false, // internal
  181. false, // no-wait
  182. nil, // arguments
  183. )
  184. if err != nil {
  185. return nil, nil, fmt.Errorf("AMQP declare exchange: %w", err)
  186. }
  187. return conn, ch, nil
  188. }
  189. func amqpConsume(ch *amqp.Channel) (<-chan amqp.Delivery, error) {
  190. q, err := ch.QueueDeclare(
  191. "", // name
  192. false, // durable
  193. false, // delete when unused
  194. true, // exclusive
  195. false, // no-wait
  196. nil, // arguments
  197. )
  198. if err != nil {
  199. return nil, fmt.Errorf("AMQP declare queue: %w", err)
  200. }
  201. err = ch.QueueBind(
  202. q.Name, // queue name
  203. "", // routing key
  204. "discovery", // exchange
  205. false,
  206. nil,
  207. )
  208. if err != nil {
  209. return nil, fmt.Errorf("AMQP bind queue: %w", err)
  210. }
  211. msgs, err := ch.Consume(
  212. q.Name, // queue
  213. "", // consumer
  214. true, // auto-ack
  215. false, // exclusive
  216. false, // no-local
  217. false, // no-wait
  218. nil, // args
  219. )
  220. if err != nil {
  221. return nil, fmt.Errorf("AMQP consume: %w", err)
  222. }
  223. return msgs, nil
  224. }