database.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. // Copyright (C) 2018 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. //go:generate go run ../../proto/scripts/protofmt.go database.proto
  7. //go:generate protoc -I ../../ -I . --gogofast_out=. database.proto
  8. package main
  9. import (
  10. "log"
  11. "sort"
  12. "time"
  13. "github.com/syndtr/goleveldb/leveldb"
  14. "github.com/syndtr/goleveldb/leveldb/util"
  15. )
  16. type clock interface {
  17. Now() time.Time
  18. }
  19. type defaultClock struct{}
  20. func (defaultClock) Now() time.Time {
  21. return time.Now()
  22. }
  23. type database interface {
  24. put(key string, rec DatabaseRecord) error
  25. merge(key string, addrs []DatabaseAddress, seen int64) error
  26. get(key string) (DatabaseRecord, error)
  27. }
  28. type levelDBStore struct {
  29. db *leveldb.DB
  30. inbox chan func()
  31. stop chan struct{}
  32. clock clock
  33. marshalBuf []byte
  34. }
  35. func newLevelDBStore(dir string) (*levelDBStore, error) {
  36. db, err := leveldb.OpenFile(dir, levelDBOptions)
  37. if err != nil {
  38. return nil, err
  39. }
  40. return &levelDBStore{
  41. db: db,
  42. inbox: make(chan func(), 16),
  43. stop: make(chan struct{}),
  44. clock: defaultClock{},
  45. }, nil
  46. }
  47. func (s *levelDBStore) put(key string, rec DatabaseRecord) error {
  48. t0 := time.Now()
  49. defer func() {
  50. databaseOperationSeconds.WithLabelValues(dbOpPut).Observe(time.Since(t0).Seconds())
  51. }()
  52. rc := make(chan error)
  53. s.inbox <- func() {
  54. size := rec.Size()
  55. if len(s.marshalBuf) < size {
  56. s.marshalBuf = make([]byte, size)
  57. }
  58. n, _ := rec.MarshalTo(s.marshalBuf)
  59. rc <- s.db.Put([]byte(key), s.marshalBuf[:n], nil)
  60. }
  61. err := <-rc
  62. if err != nil {
  63. databaseOperations.WithLabelValues(dbOpPut, dbResError).Inc()
  64. } else {
  65. databaseOperations.WithLabelValues(dbOpPut, dbResSuccess).Inc()
  66. }
  67. return err
  68. }
  69. func (s *levelDBStore) merge(key string, addrs []DatabaseAddress, seen int64) error {
  70. t0 := time.Now()
  71. defer func() {
  72. databaseOperationSeconds.WithLabelValues(dbOpMerge).Observe(time.Since(t0).Seconds())
  73. }()
  74. rc := make(chan error)
  75. newRec := DatabaseRecord{
  76. Addresses: addrs,
  77. Seen: seen,
  78. }
  79. s.inbox <- func() {
  80. // grab the existing record
  81. oldRec, err := s.get(key)
  82. if err != nil {
  83. // "not found" is not an error from get, so this is serious
  84. // stuff only
  85. rc <- err
  86. return
  87. }
  88. newRec = merge(newRec, oldRec)
  89. // We replicate s.put() functionality here ourselves instead of
  90. // calling it because we want to serialize our get above together
  91. // with the put in the same function.
  92. size := newRec.Size()
  93. if len(s.marshalBuf) < size {
  94. s.marshalBuf = make([]byte, size)
  95. }
  96. n, _ := newRec.MarshalTo(s.marshalBuf)
  97. rc <- s.db.Put([]byte(key), s.marshalBuf[:n], nil)
  98. }
  99. err := <-rc
  100. if err != nil {
  101. databaseOperations.WithLabelValues(dbOpMerge, dbResError).Inc()
  102. } else {
  103. databaseOperations.WithLabelValues(dbOpMerge, dbResSuccess).Inc()
  104. }
  105. return err
  106. }
  107. func (s *levelDBStore) get(key string) (DatabaseRecord, error) {
  108. t0 := time.Now()
  109. defer func() {
  110. databaseOperationSeconds.WithLabelValues(dbOpGet).Observe(time.Since(t0).Seconds())
  111. }()
  112. keyBs := []byte(key)
  113. val, err := s.db.Get(keyBs, nil)
  114. if err == leveldb.ErrNotFound {
  115. databaseOperations.WithLabelValues(dbOpGet, dbResNotFound).Inc()
  116. return DatabaseRecord{}, nil
  117. }
  118. if err != nil {
  119. databaseOperations.WithLabelValues(dbOpGet, dbResError).Inc()
  120. return DatabaseRecord{}, err
  121. }
  122. var rec DatabaseRecord
  123. if err := rec.Unmarshal(val); err != nil {
  124. databaseOperations.WithLabelValues(dbOpGet, dbResUnmarshalError).Inc()
  125. return DatabaseRecord{}, nil
  126. }
  127. rec.Addresses = expire(rec.Addresses, s.clock.Now().UnixNano())
  128. databaseOperations.WithLabelValues(dbOpGet, dbResSuccess).Inc()
  129. return rec, nil
  130. }
  131. func (s *levelDBStore) Serve() {
  132. t := time.NewTimer(0)
  133. defer t.Stop()
  134. defer s.db.Close()
  135. // Start the statistics serve routine. It will exit with us when
  136. // statisticsTrigger is closed.
  137. statisticsTrigger := make(chan struct{})
  138. statisticsDone := make(chan struct{})
  139. go s.statisticsServe(statisticsTrigger, statisticsDone)
  140. loop:
  141. for {
  142. select {
  143. case fn := <-s.inbox:
  144. // Run function in serialized order.
  145. fn()
  146. case <-t.C:
  147. // Trigger the statistics routine to do its thing in the
  148. // background.
  149. statisticsTrigger <- struct{}{}
  150. case <-statisticsDone:
  151. // The statistics routine is done with one iteratation, schedule
  152. // the next.
  153. t.Reset(databaseStatisticsInterval)
  154. case <-s.stop:
  155. // We're done.
  156. close(statisticsTrigger)
  157. break loop
  158. }
  159. }
  160. // Also wait for statisticsServe to return
  161. <-statisticsDone
  162. }
  163. func (s *levelDBStore) statisticsServe(trigger <-chan struct{}, done chan<- struct{}) {
  164. defer close(done)
  165. for range trigger {
  166. t0 := time.Now()
  167. nowNanos := t0.UnixNano()
  168. cutoff24h := t0.Add(-24 * time.Hour).UnixNano()
  169. cutoff1w := t0.Add(-7 * 24 * time.Hour).UnixNano()
  170. cutoff2Mon := t0.Add(-60 * 24 * time.Hour).UnixNano()
  171. current, last24h, last1w, inactive, errors := 0, 0, 0, 0, 0
  172. iter := s.db.NewIterator(&util.Range{}, nil)
  173. for iter.Next() {
  174. // Attempt to unmarshal the record and count the
  175. // failure if there's something wrong with it.
  176. var rec DatabaseRecord
  177. if err := rec.Unmarshal(iter.Value()); err != nil {
  178. errors++
  179. continue
  180. }
  181. // If there are addresses that have not expired it's a current
  182. // record, otherwise account it based on when it was last seen
  183. // (last 24 hours or last week) or finally as inactice.
  184. switch {
  185. case len(expire(rec.Addresses, nowNanos)) > 0:
  186. current++
  187. case rec.Seen > cutoff24h:
  188. last24h++
  189. case rec.Seen > cutoff1w:
  190. last1w++
  191. case rec.Seen > cutoff2Mon:
  192. inactive++
  193. case rec.Missed < cutoff2Mon:
  194. // It hasn't been seen lately and we haven't recorded
  195. // someone asking for this device in a long time either;
  196. // delete the record.
  197. if err := s.db.Delete(iter.Key(), nil); err != nil {
  198. databaseOperations.WithLabelValues(dbOpDelete, dbResError).Inc()
  199. } else {
  200. databaseOperations.WithLabelValues(dbOpDelete, dbResSuccess).Inc()
  201. }
  202. default:
  203. inactive++
  204. }
  205. }
  206. iter.Release()
  207. databaseKeys.WithLabelValues("current").Set(float64(current))
  208. databaseKeys.WithLabelValues("last24h").Set(float64(last24h))
  209. databaseKeys.WithLabelValues("last1w").Set(float64(last1w))
  210. databaseKeys.WithLabelValues("inactive").Set(float64(inactive))
  211. databaseKeys.WithLabelValues("error").Set(float64(errors))
  212. databaseStatisticsSeconds.Set(time.Since(t0).Seconds())
  213. // Signal that we are done and can be scheduled again.
  214. done <- struct{}{}
  215. }
  216. }
  217. func (s *levelDBStore) Stop() {
  218. close(s.stop)
  219. }
  220. // merge returns the merged result of the two database records a and b. The
  221. // result is the union of the two address sets, with the newer expiry time
  222. // chosen for any duplicates.
  223. func merge(a, b DatabaseRecord) DatabaseRecord {
  224. // Both lists must be sorted for this to work.
  225. if !sort.IsSorted(databaseAddressOrder(a.Addresses)) {
  226. log.Println("Warning: bug: addresses not correctly sorted in merge")
  227. a.Addresses = sortedAddressCopy(a.Addresses)
  228. }
  229. if !sort.IsSorted(databaseAddressOrder(b.Addresses)) {
  230. // no warning because this is the side we read from disk and it may
  231. // legitimately predate correct sorting.
  232. b.Addresses = sortedAddressCopy(b.Addresses)
  233. }
  234. res := DatabaseRecord{
  235. Addresses: make([]DatabaseAddress, 0, len(a.Addresses)+len(b.Addresses)),
  236. Seen: a.Seen,
  237. }
  238. if b.Seen > a.Seen {
  239. res.Seen = b.Seen
  240. }
  241. aIdx := 0
  242. bIdx := 0
  243. aAddrs := a.Addresses
  244. bAddrs := b.Addresses
  245. loop:
  246. for {
  247. switch {
  248. case aIdx == len(aAddrs) && bIdx == len(bAddrs):
  249. // both lists are exhausted, we are done
  250. break loop
  251. case aIdx == len(aAddrs):
  252. // a is exhausted, pick from b and continue
  253. res.Addresses = append(res.Addresses, bAddrs[bIdx])
  254. bIdx++
  255. continue
  256. case bIdx == len(bAddrs):
  257. // b is exhausted, pick from a and continue
  258. res.Addresses = append(res.Addresses, aAddrs[aIdx])
  259. aIdx++
  260. continue
  261. }
  262. // We have values left on both sides.
  263. aVal := aAddrs[aIdx]
  264. bVal := bAddrs[bIdx]
  265. switch {
  266. case aVal.Address == bVal.Address:
  267. // update for same address, pick newer
  268. if aVal.Expires > bVal.Expires {
  269. res.Addresses = append(res.Addresses, aVal)
  270. } else {
  271. res.Addresses = append(res.Addresses, bVal)
  272. }
  273. aIdx++
  274. bIdx++
  275. case aVal.Address < bVal.Address:
  276. // a is smallest, pick it and continue
  277. res.Addresses = append(res.Addresses, aVal)
  278. aIdx++
  279. default:
  280. // b is smallest, pick it and continue
  281. res.Addresses = append(res.Addresses, bVal)
  282. bIdx++
  283. }
  284. }
  285. return res
  286. }
  287. // expire returns the list of addresses after removing expired entries.
  288. // Expiration happen in place, so the slice given as the parameter is
  289. // destroyed. Internal order is not preserved.
  290. func expire(addrs []DatabaseAddress, now int64) []DatabaseAddress {
  291. i := 0
  292. for i < len(addrs) {
  293. if addrs[i].Expires < now {
  294. // This item is expired. Replace it with the last in the list
  295. // (noop if we are at the last item).
  296. addrs[i] = addrs[len(addrs)-1]
  297. // Wipe the last item of the list to release references to
  298. // strings and stuff.
  299. addrs[len(addrs)-1] = DatabaseAddress{}
  300. // Shorten the slice.
  301. addrs = addrs[:len(addrs)-1]
  302. continue
  303. }
  304. i++
  305. }
  306. return addrs
  307. }
  308. func sortedAddressCopy(addrs []DatabaseAddress) []DatabaseAddress {
  309. sorted := make([]DatabaseAddress, len(addrs))
  310. copy(sorted, addrs)
  311. sort.Sort(databaseAddressOrder(sorted))
  312. return sorted
  313. }
  314. type databaseAddressOrder []DatabaseAddress
  315. func (s databaseAddressOrder) Less(a, b int) bool {
  316. return s[a].Address < s[b].Address
  317. }
  318. func (s databaseAddressOrder) Swap(a, b int) {
  319. s[a], s[b] = s[b], s[a]
  320. }
  321. func (s databaseAddressOrder) Len() int {
  322. return len(s)
  323. }