amqp.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Copyright (C) 2024 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package main
  7. import (
  8. "context"
  9. "fmt"
  10. "io"
  11. "log"
  12. amqp "github.com/rabbitmq/amqp091-go"
  13. "github.com/thejerf/suture/v4"
  14. "google.golang.org/protobuf/proto"
  15. "github.com/syncthing/syncthing/internal/gen/discosrv"
  16. "github.com/syncthing/syncthing/internal/protoutil"
  17. "github.com/syncthing/syncthing/lib/protocol"
  18. )
  19. type amqpReplicator struct {
  20. suture.Service
  21. broker string
  22. sender *amqpSender
  23. receiver *amqpReceiver
  24. outbox chan *discosrv.ReplicationRecord
  25. }
  26. func newAMQPReplicator(broker, clientID string, db database) *amqpReplicator {
  27. svc := suture.New("amqpReplicator", suture.Spec{PassThroughPanics: true})
  28. sender := &amqpSender{
  29. broker: broker,
  30. clientID: clientID,
  31. outbox: make(chan *discosrv.ReplicationRecord, replicationOutboxSize),
  32. }
  33. svc.Add(sender)
  34. receiver := &amqpReceiver{
  35. broker: broker,
  36. clientID: clientID,
  37. db: db,
  38. }
  39. svc.Add(receiver)
  40. return &amqpReplicator{
  41. Service: svc,
  42. broker: broker,
  43. sender: sender,
  44. receiver: receiver,
  45. outbox: make(chan *discosrv.ReplicationRecord, replicationOutboxSize),
  46. }
  47. }
  48. func (s *amqpReplicator) send(key *protocol.DeviceID, ps []*discosrv.DatabaseAddress, seen int64) {
  49. s.sender.send(key, ps, seen)
  50. }
  51. type amqpSender struct {
  52. broker string
  53. clientID string
  54. outbox chan *discosrv.ReplicationRecord
  55. }
  56. func (s *amqpSender) Serve(ctx context.Context) error {
  57. conn, ch, err := amqpChannel(s.broker)
  58. if err != nil {
  59. return err
  60. }
  61. defer ch.Close()
  62. defer conn.Close()
  63. buf := make([]byte, 1024)
  64. for {
  65. select {
  66. case rec := <-s.outbox:
  67. size := proto.Size(rec)
  68. if len(buf) < size {
  69. buf = make([]byte, size)
  70. }
  71. n, err := protoutil.MarshalTo(buf, rec)
  72. if err != nil {
  73. replicationSendsTotal.WithLabelValues("error").Inc()
  74. return fmt.Errorf("replication marshal: %w", err)
  75. }
  76. err = ch.PublishWithContext(ctx,
  77. "discovery", // exchange
  78. "", // routing key
  79. false, // mandatory
  80. false, // immediate
  81. amqp.Publishing{
  82. ContentType: "application/protobuf",
  83. Body: buf[:n],
  84. AppId: s.clientID,
  85. })
  86. if err != nil {
  87. replicationSendsTotal.WithLabelValues("error").Inc()
  88. return fmt.Errorf("replication publish: %w", err)
  89. }
  90. replicationSendsTotal.WithLabelValues("success").Inc()
  91. case <-ctx.Done():
  92. return nil
  93. }
  94. }
  95. }
  96. func (s *amqpSender) String() string {
  97. return fmt.Sprintf("amqpSender(%q)", s.broker)
  98. }
  99. func (s *amqpSender) send(key *protocol.DeviceID, ps []*discosrv.DatabaseAddress, seen int64) {
  100. item := &discosrv.ReplicationRecord{
  101. Key: key[:],
  102. Addresses: ps,
  103. Seen: seen,
  104. }
  105. // The send should never block. The inbox is suitably buffered for at
  106. // least a few seconds of stalls, which shouldn't happen in practice.
  107. select {
  108. case s.outbox <- item:
  109. default:
  110. replicationSendsTotal.WithLabelValues("drop").Inc()
  111. }
  112. }
  113. type amqpReceiver struct {
  114. broker string
  115. clientID string
  116. db database
  117. }
  118. func (s *amqpReceiver) Serve(ctx context.Context) error {
  119. conn, ch, err := amqpChannel(s.broker)
  120. if err != nil {
  121. return err
  122. }
  123. defer ch.Close()
  124. defer conn.Close()
  125. msgs, err := amqpConsume(ch)
  126. if err != nil {
  127. return err
  128. }
  129. for {
  130. select {
  131. case msg, ok := <-msgs:
  132. if !ok {
  133. return fmt.Errorf("subscription closed: %w", io.EOF)
  134. }
  135. // ignore messages from ourself
  136. if msg.AppId == s.clientID {
  137. continue
  138. }
  139. var rec discosrv.ReplicationRecord
  140. if err := proto.Unmarshal(msg.Body, &rec); err != nil {
  141. replicationRecvsTotal.WithLabelValues("error").Inc()
  142. return fmt.Errorf("replication unmarshal: %w", err)
  143. }
  144. id, err := protocol.DeviceIDFromBytes(rec.Key)
  145. if err != nil {
  146. id, err = protocol.DeviceIDFromString(string(rec.Key))
  147. }
  148. if err != nil {
  149. log.Println("Replication device ID:", err)
  150. replicationRecvsTotal.WithLabelValues("error").Inc()
  151. continue
  152. }
  153. if err := s.db.merge(&id, rec.Addresses, rec.Seen); err != nil {
  154. return fmt.Errorf("replication database merge: %w", err)
  155. }
  156. replicationRecvsTotal.WithLabelValues("success").Inc()
  157. case <-ctx.Done():
  158. return nil
  159. }
  160. }
  161. }
  162. func (s *amqpReceiver) String() string {
  163. return fmt.Sprintf("amqpReceiver(%q)", s.broker)
  164. }
  165. func amqpChannel(dst string) (*amqp.Connection, *amqp.Channel, error) {
  166. conn, err := amqp.Dial(dst)
  167. if err != nil {
  168. return nil, nil, fmt.Errorf("AMQP dial: %w", err)
  169. }
  170. ch, err := conn.Channel()
  171. if err != nil {
  172. return nil, nil, fmt.Errorf("AMQP channel: %w", err)
  173. }
  174. err = ch.ExchangeDeclare(
  175. "discovery", // name
  176. "fanout", // type
  177. false, // durable
  178. false, // auto-deleted
  179. false, // internal
  180. false, // no-wait
  181. nil, // arguments
  182. )
  183. if err != nil {
  184. return nil, nil, fmt.Errorf("AMQP declare exchange: %w", err)
  185. }
  186. return conn, ch, nil
  187. }
  188. func amqpConsume(ch *amqp.Channel) (<-chan amqp.Delivery, error) {
  189. q, err := ch.QueueDeclare(
  190. "", // name
  191. false, // durable
  192. false, // delete when unused
  193. true, // exclusive
  194. false, // no-wait
  195. nil, // arguments
  196. )
  197. if err != nil {
  198. return nil, fmt.Errorf("AMQP declare queue: %w", err)
  199. }
  200. err = ch.QueueBind(
  201. q.Name, // queue name
  202. "", // routing key
  203. "discovery", // exchange
  204. false,
  205. nil,
  206. )
  207. if err != nil {
  208. return nil, fmt.Errorf("AMQP bind queue: %w", err)
  209. }
  210. msgs, err := ch.Consume(
  211. q.Name, // queue
  212. "", // consumer
  213. true, // auto-ack
  214. false, // exclusive
  215. false, // no-local
  216. false, // no-wait
  217. nil, // args
  218. )
  219. if err != nil {
  220. return nil, fmt.Errorf("AMQP consume: %w", err)
  221. }
  222. return msgs, nil
  223. }