sorter.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. // Copyright (C) 2016 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "encoding/binary"
  9. "io/ioutil"
  10. "os"
  11. "sort"
  12. "github.com/syncthing/syncthing/lib/protocol"
  13. "github.com/syndtr/goleveldb/leveldb"
  14. "github.com/syndtr/goleveldb/leveldb/opt"
  15. )
  16. const (
  17. maxBytesInMemory = 512 << 10
  18. )
  19. // The IndexSorter sorts FileInfos based on their Sequence. You use it
  20. // by first Append()ing all entries to be sorted, then calling Sorted()
  21. // which will iterate over all the items in correctly sorted order.
  22. type IndexSorter interface {
  23. Append(f protocol.FileInfo)
  24. Sorted(fn func(f protocol.FileInfo) bool)
  25. Close()
  26. }
  27. type internalIndexSorter interface {
  28. IndexSorter
  29. full() bool
  30. copyTo(to IndexSorter)
  31. }
  32. // NewIndexSorter returns a new IndexSorter that will start out in memory
  33. // for efficiency but switch to on disk storage once the amount of data
  34. // becomes large.
  35. func NewIndexSorter(location string) IndexSorter {
  36. return &autoSwitchingIndexSorter{
  37. internalIndexSorter: newInMemoryIndexSorter(),
  38. location: location,
  39. }
  40. }
  41. // An autoSwitchingSorter starts out as an inMemorySorter but becomes an
  42. // onDiskSorter when the in memory sorter is full().
  43. type autoSwitchingIndexSorter struct {
  44. internalIndexSorter
  45. location string
  46. }
  47. func (s *autoSwitchingIndexSorter) Append(f protocol.FileInfo) {
  48. if s.internalIndexSorter.full() {
  49. // We spill before adding a file instead of after, to handle the
  50. // case where we're over max size but won't add any more files, in
  51. // which case we *don't* need to spill. An example of this would be
  52. // an index containing just a single large file.
  53. l.Debugf("sorter %p spills to disk", s)
  54. next := newOnDiskIndexSorter(s.location)
  55. s.internalIndexSorter.copyTo(next)
  56. s.internalIndexSorter = next
  57. }
  58. s.internalIndexSorter.Append(f)
  59. }
  60. // An inMemoryIndexSorter is simply a slice of FileInfos. The full() method
  61. // returns true when the number of files exceeds maxFiles or the total
  62. // number of blocks exceeds maxBlocks.
  63. type inMemoryIndexSorter struct {
  64. files []protocol.FileInfo
  65. bytes int
  66. maxBytes int
  67. }
  68. func newInMemoryIndexSorter() *inMemoryIndexSorter {
  69. return &inMemoryIndexSorter{
  70. maxBytes: maxBytesInMemory,
  71. }
  72. }
  73. func (s *inMemoryIndexSorter) Append(f protocol.FileInfo) {
  74. s.files = append(s.files, f)
  75. s.bytes += f.ProtoSize()
  76. }
  77. func (s *inMemoryIndexSorter) Sorted(fn func(protocol.FileInfo) bool) {
  78. sort.Sort(bySequence(s.files))
  79. for _, f := range s.files {
  80. if !fn(f) {
  81. break
  82. }
  83. }
  84. }
  85. func (s *inMemoryIndexSorter) Close() {
  86. }
  87. func (s *inMemoryIndexSorter) full() bool {
  88. return s.bytes >= s.maxBytes
  89. }
  90. func (s *inMemoryIndexSorter) copyTo(dst IndexSorter) {
  91. for _, f := range s.files {
  92. dst.Append(f)
  93. }
  94. }
  95. // bySequence sorts FileInfos by Sequence
  96. type bySequence []protocol.FileInfo
  97. func (l bySequence) Len() int {
  98. return len(l)
  99. }
  100. func (l bySequence) Swap(a, b int) {
  101. l[a], l[b] = l[b], l[a]
  102. }
  103. func (l bySequence) Less(a, b int) bool {
  104. return l[a].Sequence < l[b].Sequence
  105. }
  106. // An onDiskIndexSorter is backed by a LevelDB database in the temporary
  107. // directory. It relies on the fact that iterating over the database is done
  108. // in key order and uses the Sequence as key. When done with an
  109. // onDiskIndexSorter you must call Close() to remove the temporary database.
  110. type onDiskIndexSorter struct {
  111. db *leveldb.DB
  112. dir string
  113. }
  114. func newOnDiskIndexSorter(location string) *onDiskIndexSorter {
  115. // Set options to minimize resource usage.
  116. opts := &opt.Options{
  117. OpenFilesCacheCapacity: 10,
  118. WriteBuffer: 512 << 10,
  119. }
  120. // Use a temporary database directory.
  121. tmp, err := ioutil.TempDir(location, "tmp-index-sorter.")
  122. if err != nil {
  123. panic("creating temporary directory: " + err.Error())
  124. }
  125. db, err := leveldb.OpenFile(tmp, opts)
  126. if err != nil {
  127. panic("creating temporary database: " + err.Error())
  128. }
  129. s := &onDiskIndexSorter{
  130. db: db,
  131. dir: tmp,
  132. }
  133. l.Debugf("onDiskIndexSorter %p created at %s", s, tmp)
  134. return s
  135. }
  136. func (s *onDiskIndexSorter) Append(f protocol.FileInfo) {
  137. key := make([]byte, 8)
  138. binary.BigEndian.PutUint64(key[:], uint64(f.Sequence))
  139. data, err := f.Marshal()
  140. if err != nil {
  141. panic("bug: marshalling FileInfo should never fail: " + err.Error())
  142. }
  143. err = s.db.Put(key, data, nil)
  144. if err != nil {
  145. panic("writing to temporary database: " + err.Error())
  146. }
  147. }
  148. func (s *onDiskIndexSorter) Sorted(fn func(protocol.FileInfo) bool) {
  149. it := s.db.NewIterator(nil, nil)
  150. defer it.Release()
  151. for it.Next() {
  152. var f protocol.FileInfo
  153. if err := f.Unmarshal(it.Value()); err != nil {
  154. panic("unmarshal failed: " + err.Error())
  155. }
  156. if !fn(f) {
  157. break
  158. }
  159. }
  160. }
  161. func (s *onDiskIndexSorter) Close() {
  162. l.Debugf("onDiskIndexSorter %p closes", s)
  163. s.db.Close()
  164. os.RemoveAll(s.dir)
  165. }
  166. func (s *onDiskIndexSorter) full() bool {
  167. return false
  168. }
  169. func (s *onDiskIndexSorter) copyTo(dst IndexSorter) {
  170. // Just wrap Sorted() if we need to support this in the future.
  171. panic("unsupported")
  172. }