scheduler.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package dataprovider
  15. import (
  16. "fmt"
  17. "sync/atomic"
  18. "time"
  19. "github.com/robfig/cron/v3"
  20. "github.com/drakkan/sftpgo/v2/internal/logger"
  21. "github.com/drakkan/sftpgo/v2/internal/metric"
  22. "github.com/drakkan/sftpgo/v2/internal/util"
  23. )
  24. var (
  25. scheduler *cron.Cron
  26. lastUserCacheUpdate atomic.Int64
  27. lastIPListsCacheUpdate atomic.Int64
  28. // used for bolt and memory providers, so we avoid iterating all users/rules
  29. // to find recently modified ones
  30. lastUserUpdate atomic.Int64
  31. lastRuleUpdate atomic.Int64
  32. )
  33. func stopScheduler() {
  34. if scheduler != nil {
  35. scheduler.Stop()
  36. scheduler = nil
  37. }
  38. }
  39. func startScheduler() error {
  40. stopScheduler()
  41. scheduler = cron.New(cron.WithLocation(time.UTC))
  42. _, err := scheduler.AddFunc("@every 55s", checkDataprovider)
  43. if err != nil {
  44. return fmt.Errorf("unable to schedule dataprovider availability check: %w", err)
  45. }
  46. err = addScheduledCacheUpdates()
  47. if err != nil {
  48. return err
  49. }
  50. if currentNode != nil {
  51. _, err = scheduler.AddFunc("@every 30m", func() {
  52. err := provider.cleanupNodes()
  53. if err != nil {
  54. providerLog(logger.LevelError, "unable to cleanup nodes: %v", err)
  55. } else {
  56. providerLog(logger.LevelDebug, "cleanup nodes ok")
  57. }
  58. })
  59. }
  60. if err != nil {
  61. return fmt.Errorf("unable to schedule nodes cleanup: %w", err)
  62. }
  63. scheduler.Start()
  64. return nil
  65. }
  66. func addScheduledCacheUpdates() error {
  67. lastUserCacheUpdate.Store(util.GetTimeAsMsSinceEpoch(time.Now()))
  68. lastIPListsCacheUpdate.Store(util.GetTimeAsMsSinceEpoch(time.Now()))
  69. _, err := scheduler.AddFunc("@every 10m", checkCacheUpdates)
  70. if err != nil {
  71. return fmt.Errorf("unable to schedule cache updates: %w", err)
  72. }
  73. return nil
  74. }
  75. func checkDataprovider() {
  76. if currentNode != nil {
  77. if err := provider.updateNodeTimestamp(); err != nil {
  78. providerLog(logger.LevelError, "unable to update node timestamp: %v", err)
  79. } else {
  80. providerLog(logger.LevelDebug, "node timestamp updated")
  81. }
  82. }
  83. err := provider.checkAvailability()
  84. if err != nil {
  85. providerLog(logger.LevelError, "check availability error: %v", err)
  86. }
  87. metric.UpdateDataProviderAvailability(err)
  88. }
  89. func checkCacheUpdates() {
  90. checkUserCache()
  91. checkIPListEntryCache()
  92. }
  93. func checkUserCache() {
  94. lastCheck := lastUserCacheUpdate.Load()
  95. providerLog(logger.LevelDebug, "start user cache check, update time %v", util.GetTimeFromMsecSinceEpoch(lastCheck))
  96. checkTime := util.GetTimeAsMsSinceEpoch(time.Now())
  97. if config.IsShared == 1 {
  98. lastCheck -= 5000
  99. }
  100. users, err := provider.getRecentlyUpdatedUsers(lastCheck)
  101. if err != nil {
  102. providerLog(logger.LevelError, "unable to get recently updated users: %v", err)
  103. return
  104. }
  105. for idx := range users {
  106. user := users[idx]
  107. providerLog(logger.LevelDebug, "invalidate caches for user %q", user.Username)
  108. if user.DeletedAt > 0 {
  109. deletedAt := util.GetTimeFromMsecSinceEpoch(user.DeletedAt)
  110. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  111. providerLog(logger.LevelDebug, "removing user %q deleted at %s", user.Username, deletedAt)
  112. go provider.deleteUser(user, false) //nolint:errcheck
  113. }
  114. webDAVUsersCache.remove(user.Username)
  115. delayedQuotaUpdater.resetUserQuota(user.Username)
  116. } else {
  117. webDAVUsersCache.swap(&user)
  118. }
  119. cachedPasswords.Remove(user.Username)
  120. }
  121. lastUserCacheUpdate.Store(checkTime)
  122. providerLog(logger.LevelDebug, "end user cache check, new update time %v", util.GetTimeFromMsecSinceEpoch(lastUserCacheUpdate.Load()))
  123. }
  124. func checkIPListEntryCache() {
  125. if config.IsShared != 1 {
  126. return
  127. }
  128. hasMemoryLists := false
  129. for _, l := range inMemoryLists {
  130. if l.isInMemory.Load() {
  131. hasMemoryLists = true
  132. break
  133. }
  134. }
  135. if !hasMemoryLists {
  136. return
  137. }
  138. providerLog(logger.LevelDebug, "start IP list cache check, update time %v", util.GetTimeFromMsecSinceEpoch(lastIPListsCacheUpdate.Load()))
  139. checkTime := util.GetTimeAsMsSinceEpoch(time.Now())
  140. entries, err := provider.getRecentlyUpdatedIPListEntries(lastIPListsCacheUpdate.Load() - 5000)
  141. if err != nil {
  142. providerLog(logger.LevelError, "unable to get recently updated IP list entries: %v", err)
  143. return
  144. }
  145. for idx := range entries {
  146. e := entries[idx]
  147. providerLog(logger.LevelDebug, "update cache for IP list entry %q", e.getName())
  148. if e.DeletedAt > 0 {
  149. deletedAt := util.GetTimeFromMsecSinceEpoch(e.DeletedAt)
  150. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  151. providerLog(logger.LevelDebug, "removing IP list entry %q deleted at %s", e.getName(), deletedAt)
  152. go provider.deleteIPListEntry(e, false) //nolint:errcheck
  153. }
  154. for _, l := range inMemoryLists {
  155. l.removeEntry(&e)
  156. }
  157. } else {
  158. for _, l := range inMemoryLists {
  159. l.updateEntry(&e)
  160. }
  161. }
  162. }
  163. lastIPListsCacheUpdate.Store(checkTime)
  164. providerLog(logger.LevelDebug, "end IP list entries cache check, new update time %v", util.GetTimeFromMsecSinceEpoch(lastIPListsCacheUpdate.Load()))
  165. }
  166. func setLastUserUpdate() {
  167. lastUserUpdate.Store(util.GetTimeAsMsSinceEpoch(time.Now()))
  168. }
  169. func getLastUserUpdate() int64 {
  170. return lastUserUpdate.Load()
  171. }
  172. func setLastRuleUpdate() {
  173. lastRuleUpdate.Store(util.GetTimeAsMsSinceEpoch(time.Now()))
  174. }
  175. func getLastRuleUpdate() int64 {
  176. return lastRuleUpdate.Load()
  177. }