| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 | 
							- // Copyright (C) 2018 The Syncthing Authors.
 
- //
 
- // This Source Code Form is subject to the terms of the Mozilla Public
 
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
 
- // You can obtain one at https://mozilla.org/MPL/2.0/.
 
- //go:generate go run ../../proto/scripts/protofmt.go database.proto
 
- //go:generate protoc -I ../../ -I . --gogofast_out=. database.proto
 
- package main
 
- import (
 
- 	"context"
 
- 	"log"
 
- 	"sort"
 
- 	"time"
 
- 	"github.com/syndtr/goleveldb/leveldb"
 
- 	"github.com/syndtr/goleveldb/leveldb/util"
 
- )
 
- type clock interface {
 
- 	Now() time.Time
 
- }
 
- type defaultClock struct{}
 
- func (defaultClock) Now() time.Time {
 
- 	return time.Now()
 
- }
 
- type database interface {
 
- 	put(key string, rec DatabaseRecord) error
 
- 	merge(key string, addrs []DatabaseAddress, seen int64) error
 
- 	get(key string) (DatabaseRecord, error)
 
- }
 
- type levelDBStore struct {
 
- 	db         *leveldb.DB
 
- 	inbox      chan func()
 
- 	clock      clock
 
- 	marshalBuf []byte
 
- }
 
- func newLevelDBStore(dir string) (*levelDBStore, error) {
 
- 	db, err := leveldb.OpenFile(dir, levelDBOptions)
 
- 	if err != nil {
 
- 		return nil, err
 
- 	}
 
- 	return &levelDBStore{
 
- 		db:    db,
 
- 		inbox: make(chan func(), 16),
 
- 		clock: defaultClock{},
 
- 	}, nil
 
- }
 
- func (s *levelDBStore) put(key string, rec DatabaseRecord) error {
 
- 	t0 := time.Now()
 
- 	defer func() {
 
- 		databaseOperationSeconds.WithLabelValues(dbOpPut).Observe(time.Since(t0).Seconds())
 
- 	}()
 
- 	rc := make(chan error)
 
- 	s.inbox <- func() {
 
- 		size := rec.Size()
 
- 		if len(s.marshalBuf) < size {
 
- 			s.marshalBuf = make([]byte, size)
 
- 		}
 
- 		n, _ := rec.MarshalTo(s.marshalBuf)
 
- 		rc <- s.db.Put([]byte(key), s.marshalBuf[:n], nil)
 
- 	}
 
- 	err := <-rc
 
- 	if err != nil {
 
- 		databaseOperations.WithLabelValues(dbOpPut, dbResError).Inc()
 
- 	} else {
 
- 		databaseOperations.WithLabelValues(dbOpPut, dbResSuccess).Inc()
 
- 	}
 
- 	return err
 
- }
 
- func (s *levelDBStore) merge(key string, addrs []DatabaseAddress, seen int64) error {
 
- 	t0 := time.Now()
 
- 	defer func() {
 
- 		databaseOperationSeconds.WithLabelValues(dbOpMerge).Observe(time.Since(t0).Seconds())
 
- 	}()
 
- 	rc := make(chan error)
 
- 	newRec := DatabaseRecord{
 
- 		Addresses: addrs,
 
- 		Seen:      seen,
 
- 	}
 
- 	s.inbox <- func() {
 
- 		// grab the existing record
 
- 		oldRec, err := s.get(key)
 
- 		if err != nil {
 
- 			// "not found" is not an error from get, so this is serious
 
- 			// stuff only
 
- 			rc <- err
 
- 			return
 
- 		}
 
- 		newRec = merge(newRec, oldRec)
 
- 		// We replicate s.put() functionality here ourselves instead of
 
- 		// calling it because we want to serialize our get above together
 
- 		// with the put in the same function.
 
- 		size := newRec.Size()
 
- 		if len(s.marshalBuf) < size {
 
- 			s.marshalBuf = make([]byte, size)
 
- 		}
 
- 		n, _ := newRec.MarshalTo(s.marshalBuf)
 
- 		rc <- s.db.Put([]byte(key), s.marshalBuf[:n], nil)
 
- 	}
 
- 	err := <-rc
 
- 	if err != nil {
 
- 		databaseOperations.WithLabelValues(dbOpMerge, dbResError).Inc()
 
- 	} else {
 
- 		databaseOperations.WithLabelValues(dbOpMerge, dbResSuccess).Inc()
 
- 	}
 
- 	return err
 
- }
 
- func (s *levelDBStore) get(key string) (DatabaseRecord, error) {
 
- 	t0 := time.Now()
 
- 	defer func() {
 
- 		databaseOperationSeconds.WithLabelValues(dbOpGet).Observe(time.Since(t0).Seconds())
 
- 	}()
 
- 	keyBs := []byte(key)
 
- 	val, err := s.db.Get(keyBs, nil)
 
- 	if err == leveldb.ErrNotFound {
 
- 		databaseOperations.WithLabelValues(dbOpGet, dbResNotFound).Inc()
 
- 		return DatabaseRecord{}, nil
 
- 	}
 
- 	if err != nil {
 
- 		databaseOperations.WithLabelValues(dbOpGet, dbResError).Inc()
 
- 		return DatabaseRecord{}, err
 
- 	}
 
- 	var rec DatabaseRecord
 
- 	if err := rec.Unmarshal(val); err != nil {
 
- 		databaseOperations.WithLabelValues(dbOpGet, dbResUnmarshalError).Inc()
 
- 		return DatabaseRecord{}, nil
 
- 	}
 
- 	rec.Addresses = expire(rec.Addresses, s.clock.Now().UnixNano())
 
- 	databaseOperations.WithLabelValues(dbOpGet, dbResSuccess).Inc()
 
- 	return rec, nil
 
- }
 
- func (s *levelDBStore) Serve(ctx context.Context) error {
 
- 	t := time.NewTimer(0)
 
- 	defer t.Stop()
 
- 	defer s.db.Close()
 
- 	// Start the statistics serve routine. It will exit with us when
 
- 	// statisticsTrigger is closed.
 
- 	statisticsTrigger := make(chan struct{})
 
- 	statisticsDone := make(chan struct{})
 
- 	go s.statisticsServe(statisticsTrigger, statisticsDone)
 
- loop:
 
- 	for {
 
- 		select {
 
- 		case fn := <-s.inbox:
 
- 			// Run function in serialized order.
 
- 			fn()
 
- 		case <-t.C:
 
- 			// Trigger the statistics routine to do its thing in the
 
- 			// background.
 
- 			statisticsTrigger <- struct{}{}
 
- 		case <-statisticsDone:
 
- 			// The statistics routine is done with one iteratation, schedule
 
- 			// the next.
 
- 			t.Reset(databaseStatisticsInterval)
 
- 		case <-ctx.Done():
 
- 			// We're done.
 
- 			close(statisticsTrigger)
 
- 			break loop
 
- 		}
 
- 	}
 
- 	// Also wait for statisticsServe to return
 
- 	<-statisticsDone
 
- 	return nil
 
- }
 
- func (s *levelDBStore) statisticsServe(trigger <-chan struct{}, done chan<- struct{}) {
 
- 	defer close(done)
 
- 	for range trigger {
 
- 		t0 := time.Now()
 
- 		nowNanos := t0.UnixNano()
 
- 		cutoff24h := t0.Add(-24 * time.Hour).UnixNano()
 
- 		cutoff1w := t0.Add(-7 * 24 * time.Hour).UnixNano()
 
- 		cutoff2Mon := t0.Add(-60 * 24 * time.Hour).UnixNano()
 
- 		current, last24h, last1w, inactive, errors := 0, 0, 0, 0, 0
 
- 		iter := s.db.NewIterator(&util.Range{}, nil)
 
- 		for iter.Next() {
 
- 			// Attempt to unmarshal the record and count the
 
- 			// failure if there's something wrong with it.
 
- 			var rec DatabaseRecord
 
- 			if err := rec.Unmarshal(iter.Value()); err != nil {
 
- 				errors++
 
- 				continue
 
- 			}
 
- 			// If there are addresses that have not expired it's a current
 
- 			// record, otherwise account it based on when it was last seen
 
- 			// (last 24 hours or last week) or finally as inactice.
 
- 			switch {
 
- 			case len(expire(rec.Addresses, nowNanos)) > 0:
 
- 				current++
 
- 			case rec.Seen > cutoff24h:
 
- 				last24h++
 
- 			case rec.Seen > cutoff1w:
 
- 				last1w++
 
- 			case rec.Seen > cutoff2Mon:
 
- 				inactive++
 
- 			case rec.Missed < cutoff2Mon:
 
- 				// It hasn't been seen lately and we haven't recorded
 
- 				// someone asking for this device in a long time either;
 
- 				// delete the record.
 
- 				if err := s.db.Delete(iter.Key(), nil); err != nil {
 
- 					databaseOperations.WithLabelValues(dbOpDelete, dbResError).Inc()
 
- 				} else {
 
- 					databaseOperations.WithLabelValues(dbOpDelete, dbResSuccess).Inc()
 
- 				}
 
- 			default:
 
- 				inactive++
 
- 			}
 
- 		}
 
- 		iter.Release()
 
- 		databaseKeys.WithLabelValues("current").Set(float64(current))
 
- 		databaseKeys.WithLabelValues("last24h").Set(float64(last24h))
 
- 		databaseKeys.WithLabelValues("last1w").Set(float64(last1w))
 
- 		databaseKeys.WithLabelValues("inactive").Set(float64(inactive))
 
- 		databaseKeys.WithLabelValues("error").Set(float64(errors))
 
- 		databaseStatisticsSeconds.Set(time.Since(t0).Seconds())
 
- 		// Signal that we are done and can be scheduled again.
 
- 		done <- struct{}{}
 
- 	}
 
- }
 
- // merge returns the merged result of the two database records a and b. The
 
- // result is the union of the two address sets, with the newer expiry time
 
- // chosen for any duplicates.
 
- func merge(a, b DatabaseRecord) DatabaseRecord {
 
- 	// Both lists must be sorted for this to work.
 
- 	if !sort.IsSorted(databaseAddressOrder(a.Addresses)) {
 
- 		log.Println("Warning: bug: addresses not correctly sorted in merge")
 
- 		a.Addresses = sortedAddressCopy(a.Addresses)
 
- 	}
 
- 	if !sort.IsSorted(databaseAddressOrder(b.Addresses)) {
 
- 		// no warning because this is the side we read from disk and it may
 
- 		// legitimately predate correct sorting.
 
- 		b.Addresses = sortedAddressCopy(b.Addresses)
 
- 	}
 
- 	res := DatabaseRecord{
 
- 		Addresses: make([]DatabaseAddress, 0, len(a.Addresses)+len(b.Addresses)),
 
- 		Seen:      a.Seen,
 
- 	}
 
- 	if b.Seen > a.Seen {
 
- 		res.Seen = b.Seen
 
- 	}
 
- 	aIdx := 0
 
- 	bIdx := 0
 
- 	aAddrs := a.Addresses
 
- 	bAddrs := b.Addresses
 
- loop:
 
- 	for {
 
- 		switch {
 
- 		case aIdx == len(aAddrs) && bIdx == len(bAddrs):
 
- 			// both lists are exhausted, we are done
 
- 			break loop
 
- 		case aIdx == len(aAddrs):
 
- 			// a is exhausted, pick from b and continue
 
- 			res.Addresses = append(res.Addresses, bAddrs[bIdx])
 
- 			bIdx++
 
- 			continue
 
- 		case bIdx == len(bAddrs):
 
- 			// b is exhausted, pick from a and continue
 
- 			res.Addresses = append(res.Addresses, aAddrs[aIdx])
 
- 			aIdx++
 
- 			continue
 
- 		}
 
- 		// We have values left on both sides.
 
- 		aVal := aAddrs[aIdx]
 
- 		bVal := bAddrs[bIdx]
 
- 		switch {
 
- 		case aVal.Address == bVal.Address:
 
- 			// update for same address, pick newer
 
- 			if aVal.Expires > bVal.Expires {
 
- 				res.Addresses = append(res.Addresses, aVal)
 
- 			} else {
 
- 				res.Addresses = append(res.Addresses, bVal)
 
- 			}
 
- 			aIdx++
 
- 			bIdx++
 
- 		case aVal.Address < bVal.Address:
 
- 			// a is smallest, pick it and continue
 
- 			res.Addresses = append(res.Addresses, aVal)
 
- 			aIdx++
 
- 		default:
 
- 			// b is smallest, pick it and continue
 
- 			res.Addresses = append(res.Addresses, bVal)
 
- 			bIdx++
 
- 		}
 
- 	}
 
- 	return res
 
- }
 
- // expire returns the list of addresses after removing expired entries.
 
- // Expiration happen in place, so the slice given as the parameter is
 
- // destroyed. Internal order is not preserved.
 
- func expire(addrs []DatabaseAddress, now int64) []DatabaseAddress {
 
- 	i := 0
 
- 	for i < len(addrs) {
 
- 		if addrs[i].Expires < now {
 
- 			// This item is expired. Replace it with the last in the list
 
- 			// (noop if we are at the last item).
 
- 			addrs[i] = addrs[len(addrs)-1]
 
- 			// Wipe the last item of the list to release references to
 
- 			// strings and stuff.
 
- 			addrs[len(addrs)-1] = DatabaseAddress{}
 
- 			// Shorten the slice.
 
- 			addrs = addrs[:len(addrs)-1]
 
- 			continue
 
- 		}
 
- 		i++
 
- 	}
 
- 	return addrs
 
- }
 
- func sortedAddressCopy(addrs []DatabaseAddress) []DatabaseAddress {
 
- 	sorted := make([]DatabaseAddress, len(addrs))
 
- 	copy(sorted, addrs)
 
- 	sort.Sort(databaseAddressOrder(sorted))
 
- 	return sorted
 
- }
 
- type databaseAddressOrder []DatabaseAddress
 
- func (s databaseAddressOrder) Less(a, b int) bool {
 
- 	return s[a].Address < s[b].Address
 
- }
 
- func (s databaseAddressOrder) Swap(a, b int) {
 
- 	s[a], s[b] = s[b], s[a]
 
- }
 
- func (s databaseAddressOrder) Len() int {
 
- 	return len(s)
 
- }
 
 
  |