observed.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. // Copyright (C) 2020 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "fmt"
  9. "strings"
  10. "time"
  11. "google.golang.org/protobuf/proto"
  12. "google.golang.org/protobuf/types/known/timestamppb"
  13. "github.com/syncthing/syncthing/internal/gen/dbproto"
  14. "github.com/syncthing/syncthing/lib/protocol"
  15. )
  16. type ObservedDB struct {
  17. kv KV
  18. }
  19. func NewObservedDB(kv KV) *ObservedDB {
  20. return &ObservedDB{kv: kv}
  21. }
  22. type ObservedFolder struct {
  23. Time time.Time `json:"time"`
  24. Label string `json:"label"`
  25. ReceiveEncrypted bool `json:"receiveEncrypted"`
  26. RemoteEncrypted bool `json:"remoteEncrypted"`
  27. }
  28. func (o *ObservedFolder) toWire() *dbproto.ObservedFolder {
  29. return &dbproto.ObservedFolder{
  30. Time: timestamppb.New(o.Time),
  31. Label: o.Label,
  32. ReceiveEncrypted: o.ReceiveEncrypted,
  33. RemoteEncrypted: o.RemoteEncrypted,
  34. }
  35. }
  36. func (o *ObservedFolder) fromWire(w *dbproto.ObservedFolder) {
  37. o.Time = w.GetTime().AsTime()
  38. o.Label = w.GetLabel()
  39. o.ReceiveEncrypted = w.GetReceiveEncrypted()
  40. o.RemoteEncrypted = w.GetRemoteEncrypted()
  41. }
  42. type ObservedDevice struct {
  43. Time time.Time `json:"time"`
  44. Name string `json:"name"`
  45. Address string `json:"address"`
  46. }
  47. func (o *ObservedDevice) fromWire(w *dbproto.ObservedDevice) {
  48. o.Time = w.GetTime().AsTime()
  49. o.Name = w.GetName()
  50. o.Address = w.GetAddress()
  51. }
  52. func (db *ObservedDB) AddOrUpdatePendingDevice(device protocol.DeviceID, name, address string) error {
  53. key := "device/" + device.String()
  54. od := &dbproto.ObservedDevice{
  55. Time: timestamppb.New(time.Now().Truncate(time.Second)),
  56. Name: name,
  57. Address: address,
  58. }
  59. return db.kv.PutKV(key, mustMarshal(od))
  60. }
  61. func (db *ObservedDB) RemovePendingDevice(device protocol.DeviceID) error {
  62. key := "device/" + device.String()
  63. return db.kv.DeleteKV(key)
  64. }
  65. // PendingDevices enumerates all entries. Invalid ones are dropped from the database
  66. // after a warning log message, as a side-effect.
  67. func (db *ObservedDB) PendingDevices() (map[protocol.DeviceID]ObservedDevice, error) {
  68. res := make(map[protocol.DeviceID]ObservedDevice)
  69. it, errFn := db.kv.PrefixKV("device/")
  70. for kv := range it {
  71. _, keyDev, ok := strings.Cut(kv.Key, "/")
  72. if !ok {
  73. if err := db.kv.DeleteKV(kv.Key); err != nil {
  74. return nil, fmt.Errorf("delete invalid pending device: %w", err)
  75. }
  76. continue
  77. }
  78. deviceID, err := protocol.DeviceIDFromString(keyDev)
  79. var protoD dbproto.ObservedDevice
  80. var od ObservedDevice
  81. if err != nil {
  82. goto deleteKey
  83. }
  84. if err = proto.Unmarshal(kv.Value, &protoD); err != nil {
  85. goto deleteKey
  86. }
  87. od.fromWire(&protoD)
  88. res[deviceID] = od
  89. continue
  90. deleteKey:
  91. // Deleting invalid entries is the only possible "repair" measure and
  92. // appropriate for the importance of pending entries. They will come back
  93. // soon if still relevant.
  94. if err := db.kv.DeleteKV(kv.Key); err != nil {
  95. return nil, fmt.Errorf("delete invalid pending device: %w", err)
  96. }
  97. }
  98. return res, errFn()
  99. }
  100. func (db *ObservedDB) AddOrUpdatePendingFolder(id string, of ObservedFolder, device protocol.DeviceID) error {
  101. key := "folder/" + device.String() + "/" + id
  102. return db.kv.PutKV(key, mustMarshal(of.toWire()))
  103. }
  104. // RemovePendingFolderForDevice removes entries for specific folder / device combinations.
  105. func (db *ObservedDB) RemovePendingFolderForDevice(id string, device protocol.DeviceID) error {
  106. key := "folder/" + device.String() + "/" + id
  107. return db.kv.DeleteKV(key)
  108. }
  109. // RemovePendingFolder removes all entries matching a specific folder ID.
  110. func (db *ObservedDB) RemovePendingFolder(id string) error {
  111. it, errFn := db.kv.PrefixKV("folder/")
  112. for kv := range it {
  113. parts := strings.Split(kv.Key, "/")
  114. if len(parts) != 3 || parts[2] != id {
  115. continue
  116. }
  117. if err := db.kv.DeleteKV(kv.Key); err != nil {
  118. return fmt.Errorf("delete pending folder: %w", err)
  119. }
  120. }
  121. return errFn()
  122. }
  123. // Consolidated information about a pending folder
  124. type PendingFolder struct {
  125. OfferedBy map[protocol.DeviceID]ObservedFolder `json:"offeredBy"`
  126. }
  127. func (db *ObservedDB) PendingFolders() (map[string]PendingFolder, error) {
  128. return db.PendingFoldersForDevice(protocol.EmptyDeviceID)
  129. }
  130. // PendingFoldersForDevice enumerates only entries matching the given device ID, unless it
  131. // is EmptyDeviceID. Invalid ones are dropped from the database after a info log
  132. // message, as a side-effect.
  133. func (db *ObservedDB) PendingFoldersForDevice(device protocol.DeviceID) (map[string]PendingFolder, error) {
  134. prefix := "folder/"
  135. if device != protocol.EmptyDeviceID {
  136. prefix += device.String() + "/"
  137. }
  138. res := make(map[string]PendingFolder)
  139. it, errFn := db.kv.PrefixKV(prefix)
  140. for kv := range it {
  141. parts := strings.Split(kv.Key, "/")
  142. if len(parts) != 3 {
  143. continue
  144. }
  145. keyDev := parts[1]
  146. deviceID, err := protocol.DeviceIDFromString(keyDev)
  147. var protoF dbproto.ObservedFolder
  148. var of ObservedFolder
  149. var folderID string
  150. if err != nil {
  151. goto deleteKey
  152. }
  153. if folderID = parts[2]; len(folderID) < 1 {
  154. goto deleteKey
  155. }
  156. if err = proto.Unmarshal(kv.Value, &protoF); err != nil {
  157. goto deleteKey
  158. }
  159. if _, ok := res[folderID]; !ok {
  160. res[folderID] = PendingFolder{
  161. OfferedBy: map[protocol.DeviceID]ObservedFolder{},
  162. }
  163. }
  164. of.fromWire(&protoF)
  165. res[folderID].OfferedBy[deviceID] = of
  166. continue
  167. deleteKey:
  168. // Deleting invalid entries is the only possible "repair" measure and
  169. // appropriate for the importance of pending entries. They will come back
  170. // soon if still relevant.
  171. if err := db.kv.DeleteKV(kv.Key); err != nil {
  172. return nil, fmt.Errorf("delete invalid pending folder: %w", err)
  173. }
  174. }
  175. return res, errFn()
  176. }
  177. func mustMarshal(m proto.Message) []byte {
  178. bs, err := proto.Marshal(m)
  179. if err != nil {
  180. panic(err)
  181. }
  182. return bs
  183. }