|
@@ -78,7 +78,7 @@ func newInMemoryStore(dir string, flushInterval time.Duration, s3 *s3Copier) *in
|
|
|
log.Println("Error reading database:", err)
|
|
log.Println("Error reading database:", err)
|
|
|
}
|
|
}
|
|
|
log.Printf("Read %d records from database", nr)
|
|
log.Printf("Read %d records from database", nr)
|
|
|
- s.calculateStatistics()
|
|
|
|
|
|
|
+ s.expireAndCalculateStatistics()
|
|
|
return s
|
|
return s
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -99,7 +99,7 @@ func (s *inMemoryStore) merge(key *protocol.DeviceID, addrs []DatabaseAddress, s
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
oldRec, _ := s.m.Load(*key)
|
|
oldRec, _ := s.m.Load(*key)
|
|
|
- newRec = merge(newRec, oldRec)
|
|
|
|
|
|
|
+ newRec = merge(oldRec, newRec)
|
|
|
s.m.Store(*key, newRec)
|
|
s.m.Store(*key, newRec)
|
|
|
|
|
|
|
|
databaseOperations.WithLabelValues(dbOpMerge, dbResSuccess).Inc()
|
|
databaseOperations.WithLabelValues(dbOpMerge, dbResSuccess).Inc()
|
|
@@ -126,19 +126,20 @@ func (s *inMemoryStore) get(key *protocol.DeviceID) (DatabaseRecord, error) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *inMemoryStore) Serve(ctx context.Context) error {
|
|
func (s *inMemoryStore) Serve(ctx context.Context) error {
|
|
|
- t := time.NewTimer(s.flushInterval)
|
|
|
|
|
- defer t.Stop()
|
|
|
|
|
-
|
|
|
|
|
if s.flushInterval <= 0 {
|
|
if s.flushInterval <= 0 {
|
|
|
- t.Stop()
|
|
|
|
|
|
|
+ <-ctx.Done()
|
|
|
|
|
+ return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ t := time.NewTimer(s.flushInterval)
|
|
|
|
|
+ defer t.Stop()
|
|
|
|
|
+
|
|
|
loop:
|
|
loop:
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
|
case <-t.C:
|
|
case <-t.C:
|
|
|
log.Println("Calculating statistics")
|
|
log.Println("Calculating statistics")
|
|
|
- s.calculateStatistics()
|
|
|
|
|
|
|
+ s.expireAndCalculateStatistics()
|
|
|
log.Println("Flushing database")
|
|
log.Println("Flushing database")
|
|
|
if err := s.write(); err != nil {
|
|
if err := s.write(); err != nil {
|
|
|
log.Println("Error writing database:", err)
|
|
log.Println("Error writing database:", err)
|
|
@@ -155,7 +156,7 @@ loop:
|
|
|
return s.write()
|
|
return s.write()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (s *inMemoryStore) calculateStatistics() {
|
|
|
|
|
|
|
+func (s *inMemoryStore) expireAndCalculateStatistics() {
|
|
|
now := s.clock.Now()
|
|
now := s.clock.Now()
|
|
|
cutoff24h := now.Add(-24 * time.Hour).UnixNano()
|
|
cutoff24h := now.Add(-24 * time.Hour).UnixNano()
|
|
|
cutoff1w := now.Add(-7 * 24 * time.Hour).UnixNano()
|
|
cutoff1w := now.Add(-7 * 24 * time.Hour).UnixNano()
|
|
@@ -360,69 +361,36 @@ func (s *inMemoryStore) read() (int, error) {
|
|
|
|
|
|
|
|
// merge returns the merged result of the two database records a and b. The
|
|
// merge returns the merged result of the two database records a and b. The
|
|
|
// result is the union of the two address sets, with the newer expiry time
|
|
// result is the union of the two address sets, with the newer expiry time
|
|
|
-// chosen for any duplicates.
|
|
|
|
|
|
|
+// chosen for any duplicates. The address list in a is overwritten and
|
|
|
|
|
+// reused for the result.
|
|
|
func merge(a, b DatabaseRecord) DatabaseRecord {
|
|
func merge(a, b DatabaseRecord) DatabaseRecord {
|
|
|
// Both lists must be sorted for this to work.
|
|
// Both lists must be sorted for this to work.
|
|
|
|
|
|
|
|
- res := DatabaseRecord{
|
|
|
|
|
- Addresses: make([]DatabaseAddress, 0, max(len(a.Addresses), len(b.Addresses))),
|
|
|
|
|
- Seen: a.Seen,
|
|
|
|
|
- }
|
|
|
|
|
- if b.Seen > a.Seen {
|
|
|
|
|
- res.Seen = b.Seen
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ a.Seen = max(a.Seen, b.Seen)
|
|
|
|
|
|
|
|
aIdx := 0
|
|
aIdx := 0
|
|
|
bIdx := 0
|
|
bIdx := 0
|
|
|
- aAddrs := a.Addresses
|
|
|
|
|
- bAddrs := b.Addresses
|
|
|
|
|
-loop:
|
|
|
|
|
- for {
|
|
|
|
|
- switch {
|
|
|
|
|
- case aIdx == len(aAddrs) && bIdx == len(bAddrs):
|
|
|
|
|
- // both lists are exhausted, we are done
|
|
|
|
|
- break loop
|
|
|
|
|
-
|
|
|
|
|
- case aIdx == len(aAddrs):
|
|
|
|
|
- // a is exhausted, pick from b and continue
|
|
|
|
|
- res.Addresses = append(res.Addresses, bAddrs[bIdx])
|
|
|
|
|
- bIdx++
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- case bIdx == len(bAddrs):
|
|
|
|
|
- // b is exhausted, pick from a and continue
|
|
|
|
|
- res.Addresses = append(res.Addresses, aAddrs[aIdx])
|
|
|
|
|
- aIdx++
|
|
|
|
|
- continue
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // We have values left on both sides.
|
|
|
|
|
- aVal := aAddrs[aIdx]
|
|
|
|
|
- bVal := bAddrs[bIdx]
|
|
|
|
|
-
|
|
|
|
|
- switch {
|
|
|
|
|
- case aVal.Address == bVal.Address:
|
|
|
|
|
- // update for same address, pick newer
|
|
|
|
|
- if aVal.Expires > bVal.Expires {
|
|
|
|
|
- res.Addresses = append(res.Addresses, aVal)
|
|
|
|
|
- } else {
|
|
|
|
|
- res.Addresses = append(res.Addresses, bVal)
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ for aIdx < len(a.Addresses) && bIdx < len(b.Addresses) {
|
|
|
|
|
+ switch cmp.Compare(a.Addresses[aIdx].Address, b.Addresses[bIdx].Address) {
|
|
|
|
|
+ case 0:
|
|
|
|
|
+ // a == b, choose the newer expiry time
|
|
|
|
|
+ a.Addresses[aIdx].Expires = max(a.Addresses[aIdx].Expires, b.Addresses[bIdx].Expires)
|
|
|
aIdx++
|
|
aIdx++
|
|
|
bIdx++
|
|
bIdx++
|
|
|
-
|
|
|
|
|
- case aVal.Address < bVal.Address:
|
|
|
|
|
- // a is smallest, pick it and continue
|
|
|
|
|
- res.Addresses = append(res.Addresses, aVal)
|
|
|
|
|
|
|
+ case -1:
|
|
|
|
|
+ // a < b, keep a and move on
|
|
|
aIdx++
|
|
aIdx++
|
|
|
-
|
|
|
|
|
- default:
|
|
|
|
|
- // b is smallest, pick it and continue
|
|
|
|
|
- res.Addresses = append(res.Addresses, bVal)
|
|
|
|
|
|
|
+ case 1:
|
|
|
|
|
+ // a > b, insert b before a
|
|
|
|
|
+ a.Addresses = append(a.Addresses[:aIdx], append([]DatabaseAddress{b.Addresses[bIdx]}, a.Addresses[aIdx:]...)...)
|
|
|
bIdx++
|
|
bIdx++
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- return res
|
|
|
|
|
|
|
+ if bIdx < len(b.Addresses) {
|
|
|
|
|
+ a.Addresses = append(a.Addresses, b.Addresses[bIdx:]...)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return a
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// expire returns the list of addresses after removing expired entries.
|
|
// expire returns the list of addresses after removing expired entries.
|
|
@@ -432,6 +400,10 @@ func expire(addrs []DatabaseAddress, now time.Time) []DatabaseAddress {
|
|
|
cutoff := now.UnixNano()
|
|
cutoff := now.UnixNano()
|
|
|
naddrs := addrs[:0]
|
|
naddrs := addrs[:0]
|
|
|
for i := range addrs {
|
|
for i := range addrs {
|
|
|
|
|
+ if i > 0 && addrs[i].Address == addrs[i-1].Address {
|
|
|
|
|
+ // Skip duplicates
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
if addrs[i].Expires >= cutoff {
|
|
if addrs[i].Expires >= cutoff {
|
|
|
naddrs = append(naddrs, addrs[i])
|
|
naddrs = append(naddrs, addrs[i])
|
|
|
}
|
|
}
|