database.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. // Copyright (C) 2018 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. //go:generate go run ../../script/protofmt.go database.proto
  7. //go:generate protoc -I ../../ -I . --gogofast_out=. database.proto
  8. package main
  9. import (
  10. "sort"
  11. "time"
  12. "github.com/syndtr/goleveldb/leveldb"
  13. "github.com/syndtr/goleveldb/leveldb/util"
  14. )
  15. type clock interface {
  16. Now() time.Time
  17. }
  18. type defaultClock struct{}
  19. func (defaultClock) Now() time.Time {
  20. return time.Now()
  21. }
  22. type database interface {
  23. put(key string, rec DatabaseRecord) error
  24. merge(key string, addrs []DatabaseAddress, seen int64) error
  25. get(key string) (DatabaseRecord, error)
  26. }
  27. type levelDBStore struct {
  28. db *leveldb.DB
  29. inbox chan func()
  30. stop chan struct{}
  31. clock clock
  32. marshalBuf []byte
  33. }
  34. func newLevelDBStore(dir string) (*levelDBStore, error) {
  35. db, err := leveldb.OpenFile(dir, levelDBOptions)
  36. if err != nil {
  37. return nil, err
  38. }
  39. return &levelDBStore{
  40. db: db,
  41. inbox: make(chan func(), 16),
  42. stop: make(chan struct{}),
  43. clock: defaultClock{},
  44. }, nil
  45. }
  46. func (s *levelDBStore) put(key string, rec DatabaseRecord) error {
  47. t0 := time.Now()
  48. defer func() {
  49. databaseOperationSeconds.WithLabelValues(dbOpPut).Observe(time.Since(t0).Seconds())
  50. }()
  51. rc := make(chan error)
  52. s.inbox <- func() {
  53. size := rec.Size()
  54. if len(s.marshalBuf) < size {
  55. s.marshalBuf = make([]byte, size)
  56. }
  57. n, _ := rec.MarshalTo(s.marshalBuf)
  58. rc <- s.db.Put([]byte(key), s.marshalBuf[:n], nil)
  59. }
  60. err := <-rc
  61. if err != nil {
  62. databaseOperations.WithLabelValues(dbOpPut, dbResError).Inc()
  63. } else {
  64. databaseOperations.WithLabelValues(dbOpPut, dbResSuccess).Inc()
  65. }
  66. return err
  67. }
  68. func (s *levelDBStore) merge(key string, addrs []DatabaseAddress, seen int64) error {
  69. t0 := time.Now()
  70. defer func() {
  71. databaseOperationSeconds.WithLabelValues(dbOpMerge).Observe(time.Since(t0).Seconds())
  72. }()
  73. rc := make(chan error)
  74. newRec := DatabaseRecord{
  75. Addresses: addrs,
  76. Seen: seen,
  77. }
  78. s.inbox <- func() {
  79. // grab the existing record
  80. oldRec, err := s.get(key)
  81. if err != nil {
  82. // "not found" is not an error from get, so this is serious
  83. // stuff only
  84. rc <- err
  85. return
  86. }
  87. newRec = merge(newRec, oldRec)
  88. // We replicate s.put() functionality here ourselves instead of
  89. // calling it because we want to serialize our get above together
  90. // with the put in the same function.
  91. size := newRec.Size()
  92. if len(s.marshalBuf) < size {
  93. s.marshalBuf = make([]byte, size)
  94. }
  95. n, _ := newRec.MarshalTo(s.marshalBuf)
  96. rc <- s.db.Put([]byte(key), s.marshalBuf[:n], nil)
  97. }
  98. err := <-rc
  99. if err != nil {
  100. databaseOperations.WithLabelValues(dbOpMerge, dbResError).Inc()
  101. } else {
  102. databaseOperations.WithLabelValues(dbOpMerge, dbResSuccess).Inc()
  103. }
  104. return err
  105. }
  106. func (s *levelDBStore) get(key string) (DatabaseRecord, error) {
  107. t0 := time.Now()
  108. defer func() {
  109. databaseOperationSeconds.WithLabelValues(dbOpGet).Observe(time.Since(t0).Seconds())
  110. }()
  111. keyBs := []byte(key)
  112. val, err := s.db.Get(keyBs, nil)
  113. if err == leveldb.ErrNotFound {
  114. databaseOperations.WithLabelValues(dbOpGet, dbResNotFound).Inc()
  115. return DatabaseRecord{}, nil
  116. }
  117. if err != nil {
  118. databaseOperations.WithLabelValues(dbOpGet, dbResError).Inc()
  119. return DatabaseRecord{}, err
  120. }
  121. var rec DatabaseRecord
  122. if err := rec.Unmarshal(val); err != nil {
  123. databaseOperations.WithLabelValues(dbOpGet, dbResUnmarshalError).Inc()
  124. return DatabaseRecord{}, nil
  125. }
  126. rec.Addresses = expire(rec.Addresses, s.clock.Now().UnixNano())
  127. databaseOperations.WithLabelValues(dbOpGet, dbResSuccess).Inc()
  128. return rec, nil
  129. }
  130. func (s *levelDBStore) Serve() {
  131. t := time.NewTimer(0)
  132. defer t.Stop()
  133. defer s.db.Close()
  134. // Start the statistics serve routine. It will exit with us when
  135. // statisticsTrigger is closed.
  136. statisticsTrigger := make(chan struct{})
  137. statisticsDone := make(chan struct{})
  138. go s.statisticsServe(statisticsTrigger, statisticsDone)
  139. loop:
  140. for {
  141. select {
  142. case fn := <-s.inbox:
  143. // Run function in serialized order.
  144. fn()
  145. case <-t.C:
  146. // Trigger the statistics routine to do its thing in the
  147. // background.
  148. statisticsTrigger <- struct{}{}
  149. case <-statisticsDone:
  150. // The statistics routine is done with one iteratation, schedule
  151. // the next.
  152. t.Reset(databaseStatisticsInterval)
  153. case <-s.stop:
  154. // We're done.
  155. close(statisticsTrigger)
  156. break loop
  157. }
  158. }
  159. // Also wait for statisticsServe to return
  160. <-statisticsDone
  161. }
  162. func (s *levelDBStore) statisticsServe(trigger <-chan struct{}, done chan<- struct{}) {
  163. defer close(done)
  164. for range trigger {
  165. t0 := time.Now()
  166. nowNanos := t0.UnixNano()
  167. cutoff24h := t0.Add(-24 * time.Hour).UnixNano()
  168. cutoff1w := t0.Add(-7 * 24 * time.Hour).UnixNano()
  169. cutoff2Mon := t0.Add(-60 * 24 * time.Hour).UnixNano()
  170. current, last24h, last1w, inactive, errors := 0, 0, 0, 0, 0
  171. iter := s.db.NewIterator(&util.Range{}, nil)
  172. for iter.Next() {
  173. // Attempt to unmarshal the record and count the
  174. // failure if there's something wrong with it.
  175. var rec DatabaseRecord
  176. if err := rec.Unmarshal(iter.Value()); err != nil {
  177. errors++
  178. continue
  179. }
  180. // If there are addresses that have not expired it's a current
  181. // record, otherwise account it based on when it was last seen
  182. // (last 24 hours or last week) or finally as inactice.
  183. switch {
  184. case len(expire(rec.Addresses, nowNanos)) > 0:
  185. current++
  186. case rec.Seen > cutoff24h:
  187. last24h++
  188. case rec.Seen > cutoff1w:
  189. last1w++
  190. case rec.Seen > cutoff2Mon:
  191. inactive++
  192. case rec.Missed < cutoff2Mon:
  193. // It hasn't been seen lately and we haven't recorded
  194. // someone asking for this device in a long time either;
  195. // delete the record.
  196. if err := s.db.Delete(iter.Key(), nil); err != nil {
  197. databaseOperations.WithLabelValues(dbOpDelete, dbResError).Inc()
  198. } else {
  199. databaseOperations.WithLabelValues(dbOpDelete, dbResSuccess).Inc()
  200. }
  201. default:
  202. inactive++
  203. }
  204. }
  205. iter.Release()
  206. databaseKeys.WithLabelValues("current").Set(float64(current))
  207. databaseKeys.WithLabelValues("last24h").Set(float64(last24h))
  208. databaseKeys.WithLabelValues("last1w").Set(float64(last1w))
  209. databaseKeys.WithLabelValues("inactive").Set(float64(inactive))
  210. databaseKeys.WithLabelValues("error").Set(float64(errors))
  211. databaseStatisticsSeconds.Set(time.Since(t0).Seconds())
  212. // Signal that we are done and can be scheduled again.
  213. done <- struct{}{}
  214. }
  215. }
  216. func (s *levelDBStore) Stop() {
  217. close(s.stop)
  218. }
  219. // merge returns the merged result of the two database records a and b. The
  220. // result is the union of the two address sets, with the newer expiry time
  221. // chosen for any duplicates.
  222. func merge(a, b DatabaseRecord) DatabaseRecord {
  223. // Both lists must be sorted for this to work.
  224. sort.Slice(a.Addresses, func(i, j int) bool {
  225. return a.Addresses[i].Address < a.Addresses[j].Address
  226. })
  227. sort.Slice(b.Addresses, func(i, j int) bool {
  228. return b.Addresses[i].Address < b.Addresses[j].Address
  229. })
  230. res := DatabaseRecord{
  231. Addresses: make([]DatabaseAddress, 0, len(a.Addresses)+len(b.Addresses)),
  232. Seen: a.Seen,
  233. }
  234. if b.Seen > a.Seen {
  235. res.Seen = b.Seen
  236. }
  237. aIdx := 0
  238. bIdx := 0
  239. aAddrs := a.Addresses
  240. bAddrs := b.Addresses
  241. loop:
  242. for {
  243. switch {
  244. case aIdx == len(aAddrs) && bIdx == len(bAddrs):
  245. // both lists are exhausted, we are done
  246. break loop
  247. case aIdx == len(aAddrs):
  248. // a is exhausted, pick from b and continue
  249. res.Addresses = append(res.Addresses, bAddrs[bIdx])
  250. bIdx++
  251. continue
  252. case bIdx == len(bAddrs):
  253. // b is exhausted, pick from a and continue
  254. res.Addresses = append(res.Addresses, aAddrs[aIdx])
  255. aIdx++
  256. continue
  257. }
  258. // We have values left on both sides.
  259. aVal := aAddrs[aIdx]
  260. bVal := bAddrs[bIdx]
  261. switch {
  262. case aVal.Address == bVal.Address:
  263. // update for same address, pick newer
  264. if aVal.Expires > bVal.Expires {
  265. res.Addresses = append(res.Addresses, aVal)
  266. } else {
  267. res.Addresses = append(res.Addresses, bVal)
  268. }
  269. aIdx++
  270. bIdx++
  271. case aVal.Address < bVal.Address:
  272. // a is smallest, pick it and continue
  273. res.Addresses = append(res.Addresses, aVal)
  274. aIdx++
  275. default:
  276. // b is smallest, pick it and continue
  277. res.Addresses = append(res.Addresses, bVal)
  278. bIdx++
  279. }
  280. }
  281. return res
  282. }
  283. // expire returns the list of addresses after removing expired entries.
  284. // Expiration happen in place, so the slice given as the parameter is
  285. // destroyed. Internal order is not preserved.
  286. func expire(addrs []DatabaseAddress, now int64) []DatabaseAddress {
  287. i := 0
  288. for i < len(addrs) {
  289. if addrs[i].Expires < now {
  290. // This item is expired. Replace it with the last in the list
  291. // (noop if we are at the last item).
  292. addrs[i] = addrs[len(addrs)-1]
  293. // Wipe the last item of the list to release references to
  294. // strings and stuff.
  295. addrs[len(addrs)-1] = DatabaseAddress{}
  296. // Shorten the slice.
  297. addrs = addrs[:len(addrs)-1]
  298. continue
  299. }
  300. i++
  301. }
  302. return addrs
  303. }