| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- package main
- /*
- Locking
- =======
- The model has read and write locks. These must be acquired as appropriate by
- public methods. To prevent deadlock situations, private methods should never
- acquire locks, but document what locks they require.
- TODO(jb): Keep global and per node transfer and performance statistics.
- */
- import (
- "fmt"
- "os"
- "path"
- "sync"
- "time"
- "github.com/calmh/syncthing/buffers"
- "github.com/calmh/syncthing/protocol"
- )
- type Model struct {
- sync.RWMutex
- dir string
- updated int64
- global map[string]File // the latest version of each file as it exists in the cluster
- local map[string]File // the files we currently have locally on disk
- remote map[string]map[string]File
- need map[string]bool // the files we need to update
- nodes map[string]*protocol.Connection
- }
- const (
- RemoteFetchers = 4
- FlagDeleted = 1 << 12
- )
- func NewModel(dir string) *Model {
- m := &Model{
- dir: dir,
- global: make(map[string]File),
- local: make(map[string]File),
- remote: make(map[string]map[string]File),
- need: make(map[string]bool),
- nodes: make(map[string]*protocol.Connection),
- }
- go m.printStats()
- return m
- }
- func (m *Model) Start() {
- go m.puller()
- }
- func (m *Model) printStats() {
- for {
- time.Sleep(60 * time.Second)
- m.RLock()
- for node, conn := range m.nodes {
- stats := conn.Statistics()
- if (stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0) && stats.Latency > 0 {
- infof("%s: %sB/s in, %sB/s out, %0.02f ms", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec), stats.Latency.Seconds()*1000)
- } else if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 {
- infof("%s: %sB/s in, %sB/s out", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec))
- }
- }
- m.RUnlock()
- }
- }
- func toSI(n int) string {
- if n > 1<<30 {
- return fmt.Sprintf("%.02f G", float64(n)/(1<<30))
- }
- if n > 1<<20 {
- return fmt.Sprintf("%.02f M", float64(n)/(1<<20))
- }
- if n > 1<<10 {
- return fmt.Sprintf("%.01f K", float64(n)/(1<<10))
- }
- return fmt.Sprintf("%d ", n)
- }
- func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
- m.Lock()
- defer m.Unlock()
- if opts.Debug.TraceNet {
- debugf("NET IDX(in): %s: %d files", nodeID, len(fs))
- }
- m.remote[nodeID] = make(map[string]File)
- for _, f := range fs {
- if f.Flags&FlagDeleted != 0 && !opts.Delete {
- // Files marked as deleted do not even enter the model
- continue
- }
- mf := File{
- Name: f.Name,
- Flags: f.Flags,
- Modified: int64(f.Modified),
- }
- var offset uint64
- for _, b := range f.Blocks {
- mf.Blocks = append(mf.Blocks, Block{
- Offset: offset,
- Length: b.Length,
- Hash: b.Hash,
- })
- offset += uint64(b.Length)
- }
- m.remote[nodeID][f.Name] = mf
- }
- m.recomputeGlobal()
- m.recomputeNeed()
- }
- func (m *Model) SeedIndex(fs []protocol.FileInfo) {
- m.Lock()
- defer m.Unlock()
- m.local = make(map[string]File)
- for _, f := range fs {
- mf := File{
- Name: f.Name,
- Flags: f.Flags,
- Modified: int64(f.Modified),
- }
- var offset uint64
- for _, b := range f.Blocks {
- mf.Blocks = append(mf.Blocks, Block{
- Offset: offset,
- Length: b.Length,
- Hash: b.Hash,
- })
- offset += uint64(b.Length)
- }
- m.local[f.Name] = mf
- }
- m.recomputeGlobal()
- m.recomputeNeed()
- }
- func (m *Model) Close(node string) {
- m.Lock()
- defer m.Unlock()
- if opts.Debug.TraceNet {
- debugf("NET CLOSE: %s", node)
- }
- delete(m.remote, node)
- delete(m.nodes, node)
- m.recomputeGlobal()
- m.recomputeNeed()
- }
- func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
- if opts.Debug.TraceNet && nodeID != "<local>" {
- debugf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
- }
- fn := path.Join(m.dir, name)
- fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
- if err != nil {
- return nil, err
- }
- defer fd.Close()
- buf := buffers.Get(int(size))
- _, err = fd.ReadAt(buf, int64(offset))
- if err != nil {
- return nil, err
- }
- return buf, nil
- }
- func (m *Model) RequestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
- m.RLock()
- nc := m.nodes[nodeID]
- m.RUnlock()
- if opts.Debug.TraceNet {
- debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
- }
- return nc.Request(name, offset, size, hash)
- }
- func (m *Model) ReplaceLocal(fs []File) {
- m.Lock()
- defer m.Unlock()
- var updated bool
- var newLocal = make(map[string]File)
- for _, f := range fs {
- newLocal[f.Name] = f
- if ef := m.local[f.Name]; ef.Modified != f.Modified {
- updated = true
- }
- }
- if m.markDeletedLocals(newLocal) {
- updated = true
- }
- if len(newLocal) != len(m.local) {
- updated = true
- }
- if updated {
- m.local = newLocal
- m.recomputeGlobal()
- m.recomputeNeed()
- m.updated = time.Now().Unix()
- m.broadcastIndex()
- }
- }
- // Must be called with the read lock held.
- func (m *Model) broadcastIndex() {
- idx := m.protocolIndex()
- for _, node := range m.nodes {
- node := node
- if opts.Debug.TraceNet {
- debugf("NET IDX(out): %s: %d files", node.ID, len(idx))
- }
- go node.Index(idx)
- }
- }
- // markDeletedLocals sets the deleted flag on files that have gone missing locally.
- // Must be called with the write lock held.
- func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
- // For every file in the existing local table, check if they are also
- // present in the new local table. If they are not, check that we already
- // had the newest version available according to the global table and if so
- // note the file as having been deleted.
- var updated bool
- for n, f := range m.local {
- if _, ok := newLocal[n]; !ok {
- if gf := m.global[n]; gf.Modified <= f.Modified {
- if f.Flags&FlagDeleted == 0 {
- f.Flags = FlagDeleted
- f.Modified = f.Modified + 1
- f.Blocks = nil
- updated = true
- }
- newLocal[n] = f
- }
- }
- }
- return updated
- }
- func (m *Model) UpdateLocal(f File) {
- m.Lock()
- defer m.Unlock()
- if ef, ok := m.local[f.Name]; !ok || ef.Modified != f.Modified {
- m.local[f.Name] = f
- m.recomputeGlobal()
- m.recomputeNeed()
- m.updated = time.Now().Unix()
- m.broadcastIndex()
- }
- }
- func (m *Model) Dir() string {
- m.RLock()
- defer m.RUnlock()
- return m.dir
- }
- func (m *Model) HaveFiles() []File {
- m.RLock()
- defer m.RUnlock()
- var files []File
- for _, file := range m.local {
- files = append(files, file)
- }
- return files
- }
- func (m *Model) LocalFile(name string) (File, bool) {
- m.RLock()
- defer m.RUnlock()
- f, ok := m.local[name]
- return f, ok
- }
- func (m *Model) GlobalFile(name string) (File, bool) {
- m.RLock()
- defer m.RUnlock()
- f, ok := m.global[name]
- return f, ok
- }
- // Must be called with the write lock held.
- func (m *Model) recomputeGlobal() {
- var newGlobal = make(map[string]File)
- for n, f := range m.local {
- newGlobal[n] = f
- }
- for _, fs := range m.remote {
- for n, f := range fs {
- if cf, ok := newGlobal[n]; !ok || cf.Modified < f.Modified {
- newGlobal[n] = f
- }
- }
- }
- m.global = newGlobal
- }
- // Must be called with the write lock held.
- func (m *Model) recomputeNeed() {
- m.need = make(map[string]bool)
- for n, f := range m.global {
- hf, ok := m.local[n]
- if !ok || f.Modified > hf.Modified {
- m.need[n] = true
- }
- }
- }
- // Must be called with the read lock held.
- func (m *Model) whoHas(name string) []string {
- var remote []string
- gf := m.global[name]
- for node, files := range m.remote {
- if file, ok := files[name]; ok && file.Modified == gf.Modified {
- remote = append(remote, node)
- }
- }
- return remote
- }
- func (m *Model) ConnectedTo(nodeID string) bool {
- m.RLock()
- defer m.RUnlock()
- _, ok := m.nodes[nodeID]
- return ok
- }
- func (m *Model) ProtocolIndex() []protocol.FileInfo {
- m.RLock()
- defer m.RUnlock()
- return m.protocolIndex()
- }
- // Must be called with the read lock held.
- func (m *Model) protocolIndex() []protocol.FileInfo {
- var index []protocol.FileInfo
- for _, f := range m.local {
- mf := protocol.FileInfo{
- Name: f.Name,
- Flags: f.Flags,
- Modified: int64(f.Modified),
- }
- for _, b := range f.Blocks {
- mf.Blocks = append(mf.Blocks, protocol.BlockInfo{
- Length: b.Length,
- Hash: b.Hash,
- })
- }
- if opts.Debug.TraceIdx {
- var flagComment string
- if mf.Flags&FlagDeleted != 0 {
- flagComment = " (deleted)"
- }
- debugf("IDX: %q m=%d f=%o%s (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, len(mf.Blocks))
- }
- index = append(index, mf)
- }
- return index
- }
- func (m *Model) AddNode(node *protocol.Connection) {
- m.Lock()
- m.nodes[node.ID] = node
- m.Unlock()
- m.RLock()
- idx := m.protocolIndex()
- m.RUnlock()
- if opts.Debug.TraceNet {
- debugf("NET IDX(out): %s: %d files", node.ID, len(idx))
- }
- // Sending the index might take a while if we have many files and a slow
- // uplink. Return from AddNode in the meantime.
- go node.Index(idx)
- }
|