database.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. // Copyright (C) 2018 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package main
  7. import (
  8. "bufio"
  9. "cmp"
  10. "context"
  11. "encoding/binary"
  12. "errors"
  13. "io"
  14. "log/slog"
  15. "os"
  16. "path"
  17. "runtime"
  18. "slices"
  19. "strings"
  20. "time"
  21. "github.com/puzpuzpuz/xsync/v3"
  22. "google.golang.org/protobuf/proto"
  23. "github.com/syncthing/syncthing/internal/blob"
  24. "github.com/syncthing/syncthing/internal/gen/discosrv"
  25. "github.com/syncthing/syncthing/internal/protoutil"
  26. "github.com/syncthing/syncthing/lib/protocol"
  27. "github.com/syncthing/syncthing/lib/rand"
  28. )
  29. type clock interface {
  30. Now() time.Time
  31. }
  32. type defaultClock struct{}
  33. func (defaultClock) Now() time.Time {
  34. return time.Now()
  35. }
  36. type database interface {
  37. put(key *protocol.DeviceID, rec *discosrv.DatabaseRecord) error
  38. merge(key *protocol.DeviceID, addrs []*discosrv.DatabaseAddress, seen int64) error
  39. get(key *protocol.DeviceID) (*discosrv.DatabaseRecord, error)
  40. }
  41. type inMemoryStore struct {
  42. m *xsync.MapOf[protocol.DeviceID, *discosrv.DatabaseRecord]
  43. dir string
  44. flushInterval time.Duration
  45. blobs blob.Store
  46. objKey string
  47. clock clock
  48. }
  49. func newInMemoryStore(dir string, flushInterval time.Duration, blobs blob.Store) *inMemoryStore {
  50. hn, err := os.Hostname()
  51. if err != nil {
  52. hn = rand.String(8)
  53. }
  54. s := &inMemoryStore{
  55. m: xsync.NewMapOf[protocol.DeviceID, *discosrv.DatabaseRecord](),
  56. dir: dir,
  57. flushInterval: flushInterval,
  58. blobs: blobs,
  59. objKey: hn + ".db",
  60. clock: defaultClock{},
  61. }
  62. nr, err := s.read()
  63. if os.IsNotExist(err) && blobs != nil {
  64. // Try to read from blob storage
  65. latestKey, cerr := blobs.LatestKey(context.Background())
  66. if cerr != nil {
  67. slog.Error("Failed to find database in blob storage", "error", cerr)
  68. return s
  69. }
  70. fd, cerr := os.Create(path.Join(s.dir, "records.db"))
  71. if cerr != nil {
  72. slog.Error("Failed to create database file", "error", cerr)
  73. return s
  74. }
  75. if cerr := blobs.Download(context.Background(), latestKey, fd); cerr != nil {
  76. slog.Error("Failed to download database from blob storage", "error", cerr)
  77. }
  78. _ = fd.Close()
  79. nr, err = s.read()
  80. }
  81. if err != nil {
  82. slog.Error("Failed to read database", "error", err)
  83. }
  84. slog.Info("Loaded database", "records", nr)
  85. s.expireAndCalculateStatistics()
  86. return s
  87. }
  88. func (s *inMemoryStore) put(key *protocol.DeviceID, rec *discosrv.DatabaseRecord) error {
  89. t0 := time.Now()
  90. s.m.Store(*key, rec)
  91. databaseOperations.WithLabelValues(dbOpPut, dbResSuccess).Inc()
  92. databaseOperationSeconds.WithLabelValues(dbOpPut).Observe(time.Since(t0).Seconds())
  93. return nil
  94. }
  95. func (s *inMemoryStore) merge(key *protocol.DeviceID, addrs []*discosrv.DatabaseAddress, seen int64) error {
  96. t0 := time.Now()
  97. newRec := &discosrv.DatabaseRecord{
  98. Addresses: addrs,
  99. Seen: seen,
  100. }
  101. if oldRec, ok := s.m.Load(*key); ok {
  102. newRec = merge(newRec, oldRec)
  103. }
  104. s.m.Store(*key, newRec)
  105. databaseOperations.WithLabelValues(dbOpMerge, dbResSuccess).Inc()
  106. databaseOperationSeconds.WithLabelValues(dbOpMerge).Observe(time.Since(t0).Seconds())
  107. return nil
  108. }
  109. func (s *inMemoryStore) get(key *protocol.DeviceID) (*discosrv.DatabaseRecord, error) {
  110. t0 := time.Now()
  111. defer func() {
  112. databaseOperationSeconds.WithLabelValues(dbOpGet).Observe(time.Since(t0).Seconds())
  113. }()
  114. rec, ok := s.m.Load(*key)
  115. if !ok {
  116. databaseOperations.WithLabelValues(dbOpGet, dbResNotFound).Inc()
  117. return &discosrv.DatabaseRecord{}, nil
  118. }
  119. naddresses, changed := expire(rec.Addresses, s.clock.Now())
  120. if changed {
  121. rec = &discosrv.DatabaseRecord{
  122. Addresses: naddresses,
  123. Seen: rec.Seen,
  124. }
  125. }
  126. databaseOperations.WithLabelValues(dbOpGet, dbResSuccess).Inc()
  127. return rec, nil
  128. }
  129. func (s *inMemoryStore) Serve(ctx context.Context) error {
  130. if s.flushInterval <= 0 {
  131. <-ctx.Done()
  132. return nil
  133. }
  134. t := time.NewTimer(s.flushInterval)
  135. defer t.Stop()
  136. loop:
  137. for {
  138. select {
  139. case <-t.C:
  140. slog.InfoContext(ctx, "Calculating statistics")
  141. s.expireAndCalculateStatistics()
  142. slog.InfoContext(ctx, "Flushing database")
  143. if err := s.write(); err != nil {
  144. slog.ErrorContext(ctx, "Failed to write database", "error", err)
  145. }
  146. slog.InfoContext(ctx, "Finished flushing database")
  147. t.Reset(s.flushInterval)
  148. case <-ctx.Done():
  149. // We're done.
  150. break loop
  151. }
  152. }
  153. return s.write()
  154. }
  155. func (s *inMemoryStore) expireAndCalculateStatistics() {
  156. now := s.clock.Now()
  157. cutoff24h := now.Add(-24 * time.Hour).UnixNano()
  158. cutoff1w := now.Add(-7 * 24 * time.Hour).UnixNano()
  159. current, currentIPv4, currentIPv6, currentIPv6GUA, last24h, last1w := 0, 0, 0, 0, 0, 0
  160. n := 0
  161. s.m.Range(func(key protocol.DeviceID, rec *discosrv.DatabaseRecord) bool {
  162. if n%1000 == 0 {
  163. runtime.Gosched()
  164. }
  165. n++
  166. addresses, changed := expire(rec.Addresses, now)
  167. if changed {
  168. rec = &discosrv.DatabaseRecord{
  169. Addresses: addresses,
  170. Seen: rec.Seen,
  171. }
  172. s.m.Store(key, rec)
  173. }
  174. switch {
  175. case len(rec.Addresses) > 0:
  176. current++
  177. seenIPv4, seenIPv6, seenIPv6GUA := false, false, false
  178. for _, addr := range rec.Addresses {
  179. // We do fast and loose matching on strings here instead of
  180. // parsing the address and the IP and doing "proper" checks,
  181. // to keep things fast and generate less garbage.
  182. if strings.Contains(addr.Address, "[") {
  183. seenIPv6 = true
  184. if strings.Contains(addr.Address, "[2") {
  185. seenIPv6GUA = true
  186. }
  187. } else {
  188. seenIPv4 = true
  189. }
  190. if seenIPv4 && seenIPv6 && seenIPv6GUA {
  191. break
  192. }
  193. }
  194. if seenIPv4 {
  195. currentIPv4++
  196. }
  197. if seenIPv6 {
  198. currentIPv6++
  199. }
  200. if seenIPv6GUA {
  201. currentIPv6GUA++
  202. }
  203. case rec.Seen > cutoff24h:
  204. last24h++
  205. case rec.Seen > cutoff1w:
  206. last1w++
  207. default:
  208. // drop the record if it's older than a week
  209. s.m.Delete(key)
  210. }
  211. return true
  212. })
  213. databaseKeys.WithLabelValues("current").Set(float64(current))
  214. databaseKeys.WithLabelValues("currentIPv4").Set(float64(currentIPv4))
  215. databaseKeys.WithLabelValues("currentIPv6").Set(float64(currentIPv6))
  216. databaseKeys.WithLabelValues("currentIPv6GUA").Set(float64(currentIPv6GUA))
  217. databaseKeys.WithLabelValues("last24h").Set(float64(last24h))
  218. databaseKeys.WithLabelValues("last1w").Set(float64(last1w))
  219. databaseStatisticsSeconds.Set(time.Since(now).Seconds())
  220. }
  221. func (s *inMemoryStore) write() (err error) {
  222. t0 := time.Now()
  223. defer func() {
  224. if err == nil {
  225. databaseWriteSeconds.Set(time.Since(t0).Seconds())
  226. databaseLastWritten.Set(float64(t0.Unix()))
  227. }
  228. }()
  229. dbf := path.Join(s.dir, "records.db")
  230. fd, err := os.Create(dbf + ".tmp")
  231. if err != nil {
  232. return err
  233. }
  234. bw := bufio.NewWriterSize(fd, 1<<20)
  235. var buf []byte
  236. var rangeErr error
  237. now := s.clock.Now()
  238. cutoff1w := now.Add(-7 * 24 * time.Hour).UnixNano()
  239. n := 0
  240. s.m.Range(func(key protocol.DeviceID, value *discosrv.DatabaseRecord) bool {
  241. if n%1000 == 0 {
  242. runtime.Gosched()
  243. }
  244. n++
  245. if value.Seen < cutoff1w {
  246. // drop the record if it's older than a week
  247. return true
  248. }
  249. rec := &discosrv.ReplicationRecord{
  250. Key: key[:],
  251. Addresses: value.Addresses,
  252. Seen: value.Seen,
  253. }
  254. s := proto.Size(rec)
  255. if s+4 > len(buf) {
  256. buf = make([]byte, s+4)
  257. }
  258. n, err := protoutil.MarshalTo(buf[4:], rec)
  259. if err != nil {
  260. rangeErr = err
  261. return false
  262. }
  263. binary.BigEndian.PutUint32(buf, uint32(n))
  264. if _, err := bw.Write(buf[:n+4]); err != nil {
  265. rangeErr = err
  266. return false
  267. }
  268. return true
  269. })
  270. if rangeErr != nil {
  271. _ = fd.Close()
  272. return rangeErr
  273. }
  274. if err := bw.Flush(); err != nil {
  275. _ = fd.Close
  276. return err
  277. }
  278. if err := fd.Close(); err != nil {
  279. return err
  280. }
  281. if err := os.Rename(dbf+".tmp", dbf); err != nil {
  282. return err
  283. }
  284. if info, err := os.Lstat(dbf); err == nil {
  285. slog.Info("Saved database", "name", dbf, "size", info.Size(), "modtime", info.ModTime())
  286. } else {
  287. slog.Warn("Failed to stat database after save", "error", err)
  288. }
  289. // Upload to blob storage
  290. if s.blobs != nil {
  291. fd, err = os.Open(dbf)
  292. if err != nil {
  293. slog.Error("Failed to upload database to blob storage", "error", err)
  294. return nil
  295. }
  296. defer fd.Close()
  297. if err := s.blobs.Upload(context.Background(), s.objKey, fd); err != nil {
  298. slog.Error("Failed to upload database to blob storage", "error", err)
  299. }
  300. slog.Info("Finished uploading database")
  301. }
  302. return nil
  303. }
  304. func (s *inMemoryStore) read() (int, error) {
  305. fd, err := os.Open(path.Join(s.dir, "records.db"))
  306. if err != nil {
  307. return 0, err
  308. }
  309. defer fd.Close()
  310. br := bufio.NewReader(fd)
  311. var buf []byte
  312. nr := 0
  313. for {
  314. var n uint32
  315. if err := binary.Read(br, binary.BigEndian, &n); err != nil {
  316. if errors.Is(err, io.EOF) {
  317. break
  318. }
  319. return nr, err
  320. }
  321. if int(n) > len(buf) {
  322. buf = make([]byte, n)
  323. }
  324. if _, err := io.ReadFull(br, buf[:n]); err != nil {
  325. return nr, err
  326. }
  327. rec := &discosrv.ReplicationRecord{}
  328. if err := proto.Unmarshal(buf[:n], rec); err != nil {
  329. return nr, err
  330. }
  331. key, err := protocol.DeviceIDFromBytes(rec.Key)
  332. if err != nil {
  333. key, err = protocol.DeviceIDFromString(string(rec.Key))
  334. }
  335. if err != nil {
  336. slog.Error("Got bad device ID while reading database", "error", err)
  337. continue
  338. }
  339. slices.SortFunc(rec.Addresses, Cmp)
  340. rec.Addresses, _ = expire(slices.CompactFunc(rec.Addresses, Equal), s.clock.Now())
  341. s.m.Store(key, &discosrv.DatabaseRecord{
  342. Addresses: rec.Addresses,
  343. Seen: rec.Seen,
  344. })
  345. nr++
  346. }
  347. return nr, nil
  348. }
  349. // merge returns the merged result of the two database records a and b. The
  350. // result is the union of the two address sets, with the newer expiry time
  351. // chosen for any duplicates. The address list in a is overwritten and
  352. // reused for the result; b is not modified.
  353. func merge(a, b *discosrv.DatabaseRecord) *discosrv.DatabaseRecord {
  354. // Both lists must be sorted for this to work.
  355. a.Seen = max(a.Seen, b.Seen)
  356. aIdx := 0
  357. bIdx := 0
  358. for aIdx < len(a.Addresses) && bIdx < len(b.Addresses) {
  359. switch cmp.Compare(a.Addresses[aIdx].Address, b.Addresses[bIdx].Address) {
  360. case 0:
  361. // a == b, choose the newer expiry time
  362. a.Addresses[aIdx].Expires = max(a.Addresses[aIdx].Expires, b.Addresses[bIdx].Expires)
  363. aIdx++
  364. bIdx++
  365. case -1:
  366. // a < b, keep a and move on
  367. aIdx++
  368. case 1:
  369. // a > b, insert b before a
  370. a.Addresses = append(a.Addresses[:aIdx], append([]*discosrv.DatabaseAddress{b.Addresses[bIdx]}, a.Addresses[aIdx:]...)...)
  371. bIdx++
  372. }
  373. }
  374. if bIdx < len(b.Addresses) {
  375. a.Addresses = append(a.Addresses, b.Addresses[bIdx:]...)
  376. }
  377. return a
  378. }
  379. // expire returns the list of addresses after removing expired entries. A
  380. // new slice is allocated if any changes are required, and the changed
  381. // boolean indicates whether that happened or not.
  382. func expire(addrs []*discosrv.DatabaseAddress, now time.Time) (result []*discosrv.DatabaseAddress, changed bool) {
  383. cutoff := now.UnixNano()
  384. remains := 0
  385. for _, a := range addrs {
  386. if a.Expires < cutoff {
  387. changed = true
  388. } else {
  389. remains++
  390. }
  391. }
  392. if !changed {
  393. return addrs, false
  394. }
  395. if remains == 0 {
  396. return nil, true
  397. }
  398. naddrs := make([]*discosrv.DatabaseAddress, 0, remains)
  399. for _, a := range addrs {
  400. if a.Expires >= cutoff {
  401. naddrs = append(naddrs, a)
  402. }
  403. }
  404. return naddrs, true
  405. }
  406. func Cmp(d, other *discosrv.DatabaseAddress) (n int) {
  407. if c := cmp.Compare(d.Address, other.Address); c != 0 {
  408. return c
  409. }
  410. return cmp.Compare(d.Expires, other.Expires)
  411. }
  412. func Equal(d, other *discosrv.DatabaseAddress) bool {
  413. return d.Address == other.Address
  414. }