Browse Source

lib/model: Emit LocalDiskUpdated events on detecting local changes

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3055
Nate Morrison 9 years ago
parent
commit
5a7fad0bcd

+ 3 - 2
cmd/syncthing/main.go

@@ -532,8 +532,9 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
 	errors := logger.NewRecorder(l, logger.LevelWarn, maxSystemErrors, 0)
 	systemLog := logger.NewRecorder(l, logger.LevelDebug, maxSystemLog, initialSystemLog)
 
-	// Event subscription for the API; must start early to catch the early events.
-	apiSub := events.NewBufferedSubscription(events.Default.Subscribe(events.AllEvents), 1000)
+	// Event subscription for the API; must start early to catch the early events.  The LocalDiskUpdated
+	// event might overwhelm the event reciever in some situations so we will not subscribe to it here.
+	apiSub := events.NewBufferedSubscription(events.Default.Subscribe(events.AllEvents&^events.LocalDiskUpdated), 1000)
 
 	if len(os.Getenv("GOMAXPROCS")) == 0 {
 		runtime.GOMAXPROCS(runtime.NumCPU())

+ 14 - 1
cmd/syncthing/verboseservice.go

@@ -72,15 +72,18 @@ func (s *verboseService) formatEvent(ev events.Event) string {
 
 	case events.Starting:
 		return fmt.Sprintf("Starting up (%s)", ev.Data.(map[string]string)["home"])
+
 	case events.StartupComplete:
 		return "Startup complete"
 
 	case events.DeviceDiscovered:
 		data := ev.Data.(map[string]interface{})
 		return fmt.Sprintf("Discovered device %v at %v", data["device"], data["addrs"])
+
 	case events.DeviceConnected:
 		data := ev.Data.(map[string]string)
 		return fmt.Sprintf("Connected to device %v at %v (type %s)", data["id"], data["addr"], data["type"])
+
 	case events.DeviceDisconnected:
 		data := ev.Data.(map[string]string)
 		return fmt.Sprintf("Disconnected from device %v", data["id"])
@@ -89,6 +92,10 @@ func (s *verboseService) formatEvent(ev events.Event) string {
 		data := ev.Data.(map[string]interface{})
 		return fmt.Sprintf("Folder %q is now %v", data["folder"], data["to"])
 
+	case events.LocalDiskUpdated:
+		data := ev.Data.(map[string]string)
+		return fmt.Sprintf("%s a %s: [ %s ]", data["action"], data["type"], data["path"])
+
 	case events.RemoteIndexUpdated:
 		data := ev.Data.(map[string]interface{})
 		return fmt.Sprintf("Device %v sent an index update for %q with %d items", data["device"], data["folder"], data["items"])
@@ -96,6 +103,7 @@ func (s *verboseService) formatEvent(ev events.Event) string {
 	case events.DeviceRejected:
 		data := ev.Data.(map[string]interface{})
 		return fmt.Sprintf("Rejected connection from device %v at %v", data["device"], data["address"])
+
 	case events.FolderRejected:
 		data := ev.Data.(map[string]string)
 		return fmt.Sprintf("Rejected unshared folder %q from device %v", data["folder"], data["device"])
@@ -103,6 +111,7 @@ func (s *verboseService) formatEvent(ev events.Event) string {
 	case events.ItemStarted:
 		data := ev.Data.(map[string]string)
 		return fmt.Sprintf("Started syncing %q / %q (%v %v)", data["folder"], data["item"], data["action"], data["type"])
+
 	case events.ItemFinished:
 		data := ev.Data.(map[string]interface{})
 		if err, ok := data["error"].(*string); ok && err != nil {
@@ -119,6 +128,7 @@ func (s *verboseService) formatEvent(ev events.Event) string {
 	case events.FolderCompletion:
 		data := ev.Data.(map[string]interface{})
 		return fmt.Sprintf("Completion for folder %q on device %v is %v%%", data["folder"], data["device"], data["completion"])
+
 	case events.FolderSummary:
 		data := ev.Data.(map[string]interface{})
 		sum := data["summary"].(map[string]interface{})
@@ -126,6 +136,7 @@ func (s *verboseService) formatEvent(ev events.Event) string {
 		delete(sum, "ignorePatterns")
 		delete(sum, "stateChanged")
 		return fmt.Sprintf("Summary for folder %q is %v", data["folder"], data["summary"])
+
 	case events.FolderScanProgress:
 		data := ev.Data.(map[string]interface{})
 		folder := data["folder"].(string)
@@ -142,16 +153,19 @@ func (s *verboseService) formatEvent(ev events.Event) string {
 		data := ev.Data.(map[string]string)
 		device := data["device"]
 		return fmt.Sprintf("Device %v was paused", device)
+
 	case events.DeviceResumed:
 		data := ev.Data.(map[string]string)
 		device := data["device"]
 		return fmt.Sprintf("Device %v was resumed", device)
+
 	case events.ListenAddressesChanged:
 		data := ev.Data.(map[string]interface{})
 		address := data["address"]
 		lan := data["lan"]
 		wan := data["wan"]
 		return fmt.Sprintf("Listen address %s resolution has changed: lan addresses: %s wan addresses: %s", address, lan, wan)
+
 	case events.LoginAttempt:
 		data := ev.Data.(map[string]interface{})
 		username := data["username"].(string)
@@ -162,7 +176,6 @@ func (s *verboseService) formatEvent(ev events.Event) string {
 			success = "failed"
 		}
 		return fmt.Sprintf("Login %s for username %s.", success, username)
-
 	}
 
 	return fmt.Sprintf("%s %#v", ev.Type, ev)

+ 3 - 0
lib/events/events.go

@@ -27,6 +27,7 @@ const (
 	DeviceRejected
 	DevicePaused
 	DeviceResumed
+	LocalDiskUpdated
 	LocalIndexUpdated
 	RemoteIndexUpdated
 	ItemStarted
@@ -61,6 +62,8 @@ func (t EventType) String() string {
 		return "DeviceDisconnected"
 	case DeviceRejected:
 		return "DeviceRejected"
+	case LocalDiskUpdated:
+		return "LocalDiskUpdated"
 	case LocalIndexUpdated:
 		return "LocalIndexUpdated"
 	case RemoteIndexUpdated:

+ 58 - 7
lib/model/model.go

@@ -1234,7 +1234,15 @@ func sendIndexTo(initial bool, minLocalVer int64, conn protocol.Connection, fold
 	return maxLocalVer, err
 }
 
-func (m *Model) updateLocals(folder string, fs []protocol.FileInfo) {
+func (m *Model) updateLocalsFromScanning(folder string, fs []protocol.FileInfo) {
+	m.updateLocals(folder, fs, true)
+}
+
+func (m *Model) updateLocalsFromPulling(folder string, fs []protocol.FileInfo) {
+	m.updateLocals(folder, fs, false)
+}
+
+func (m *Model) updateLocals(folder string, fs []protocol.FileInfo, fromScanning bool) {
 	m.fmut.RLock()
 	files := m.folderFiles[folder]
 	m.fmut.RUnlock()
@@ -1255,6 +1263,49 @@ func (m *Model) updateLocals(folder string, fs []protocol.FileInfo) {
 		"filenames": filenames,
 		"version":   files.LocalVersion(protocol.LocalDeviceID),
 	})
+
+	// Lets us know if file/folder change was originated locally or from a network
+	// sync update.  Now write these to a global log file.
+	if !fromScanning {
+		m.localDiskUpdated(m.folderCfgs[folder].Path(), fs)
+	}
+}
+
+func (m *Model) localDiskUpdated(path string, files []protocol.FileInfo) {
+	// For windows paths, strip unwanted chars from the front
+	path = strings.Replace(path, `\\?\`, "", 1)
+
+	for _, file := range files {
+		objType := "file"
+		action := "Modified"
+
+		// If our local vector is verison 1 AND it is the only version vector so far seen for this file then
+		// it is a new file.  Else if it is > 1 it's not new, and if it is 1 but another shortId version vector
+		// exists then it is new for us but created elsewhere so the file is still not new but modified by us.
+		// Only if it is truly new do we change this to 'added', else we leave it as 'modified'.
+		if len(file.Version) == 1 && file.Version[0].Value == 1 {
+			action = "Added"
+		}
+
+		if file.IsDirectory() {
+			objType = "dir"
+		}
+		if file.IsDeleted() {
+			action = "Deleted"
+		}
+
+		// If the file is a level or more deep then the forward slash seperator is embedded
+		// in the filename and makes the path look wierd on windows, so lets fix it
+		filename := filepath.FromSlash(file.Name)
+		// And append it to the filepath
+		path := filepath.Join(path, filename)
+
+		events.Default.Log(events.LocalDiskUpdated, map[string]string{
+			"action": action,
+			"type":   objType,
+			"path":   path,
+		})
+	}
 }
 
 func (m *Model) requestGlobal(deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
@@ -1444,7 +1495,7 @@ func (m *Model) internalScanFolderSubdirs(folder string, subs []string) error {
 				l.Infof("Stopping folder %s mid-scan due to folder error: %s", folder, err)
 				return err
 			}
-			m.updateLocals(folder, batch)
+			m.updateLocalsFromPulling(folder, batch)
 			batch = batch[:0]
 			blocksHandled = 0
 		}
@@ -1456,7 +1507,7 @@ func (m *Model) internalScanFolderSubdirs(folder string, subs []string) error {
 		l.Infof("Stopping folder %s mid-scan due to folder error: %s", folder, err)
 		return err
 	} else if len(batch) > 0 {
-		m.updateLocals(folder, batch)
+		m.updateLocalsFromPulling(folder, batch)
 	}
 
 	if len(subs) == 0 {
@@ -1478,7 +1529,7 @@ func (m *Model) internalScanFolderSubdirs(folder string, subs []string) error {
 						iterError = err
 						return false
 					}
-					m.updateLocals(folder, batch)
+					m.updateLocalsFromPulling(folder, batch)
 					batch = batch[:0]
 				}
 
@@ -1530,7 +1581,7 @@ func (m *Model) internalScanFolderSubdirs(folder string, subs []string) error {
 		l.Infof("Stopping folder %s mid-scan due to folder error: %s", folder, err)
 		return err
 	} else if len(batch) > 0 {
-		m.updateLocals(folder, batch)
+		m.updateLocalsFromPulling(folder, batch)
 	}
 
 	runner.setState(FolderIdle)
@@ -1658,7 +1709,7 @@ func (m *Model) Override(folder string) {
 	fs.WithNeed(protocol.LocalDeviceID, func(fi db.FileIntf) bool {
 		need := fi.(protocol.FileInfo)
 		if len(batch) == indexBatchSize {
-			m.updateLocals(folder, batch)
+			m.updateLocalsFromPulling(folder, batch)
 			batch = batch[:0]
 		}
 
@@ -1678,7 +1729,7 @@ func (m *Model) Override(folder string) {
 		return true
 	})
 	if len(batch) > 0 {
-		m.updateLocals(folder, batch)
+		m.updateLocalsFromPulling(folder, batch)
 	}
 	runner.setState(FolderIdle)
 }

+ 3 - 1
lib/model/rwfolder.go

@@ -1398,7 +1398,9 @@ func (f *rwFolder) dbUpdaterRoutine() {
 			lastFile = job.file
 		}
 
-		f.model.updateLocals(f.folderID, files)
+		// All updates to file/folder objects that originated remotely
+		// (across the network) use this call to updateLocals
+		f.model.updateLocalsFromScanning(f.folderID, files)
 
 		if found {
 			f.model.receivedFile(f.folderID, lastFile)

+ 3 - 3
lib/model/rwfolder_test.go

@@ -62,7 +62,7 @@ func setUpModel(file protocol.FileInfo) *Model {
 	model := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db, nil)
 	model.AddFolder(defaultFolderConfig)
 	// Update index
-	model.updateLocals("default", []protocol.FileInfo{file})
+	model.updateLocalsFromPulling("default", []protocol.FileInfo{file})
 	return model
 }
 
@@ -255,7 +255,7 @@ func TestCopierCleanup(t *testing.T) {
 	file.Blocks = []protocol.BlockInfo{blocks[1]}
 	file.Version = file.Version.Update(protocol.LocalDeviceID.Short())
 	// Update index (removing old blocks)
-	m.updateLocals("default", []protocol.FileInfo{file})
+	m.updateLocalsFromPulling("default", []protocol.FileInfo{file})
 
 	if m.finder.Iterate(folders, blocks[0].Hash, iterFn) {
 		t.Error("Unexpected block found")
@@ -268,7 +268,7 @@ func TestCopierCleanup(t *testing.T) {
 	file.Blocks = []protocol.BlockInfo{blocks[0]}
 	file.Version = file.Version.Update(protocol.LocalDeviceID.Short())
 	// Update index (removing old blocks)
-	m.updateLocals("default", []protocol.FileInfo{file})
+	m.updateLocalsFromPulling("default", []protocol.FileInfo{file})
 
 	if !m.finder.Iterate(folders, blocks[0].Hash, iterFn) {
 		t.Error("Unexpected block found")

+ 1 - 1
lib/protocol/vector.go

@@ -98,7 +98,7 @@ func (v Vector) GreaterEqual(b Vector) bool {
 	return comp == Greater || comp == Equal
 }
 
-// Concurrent returns true when the two vectors are concrurrent.
+// Concurrent returns true when the two vectors are concurrent.
 func (v Vector) Concurrent(b Vector) bool {
 	comp := v.Compare(b)
 	return comp == ConcurrentGreater || comp == ConcurrentLesser