quotaupdater.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package dataprovider
  15. import (
  16. "sync"
  17. "time"
  18. "github.com/drakkan/sftpgo/v2/logger"
  19. )
  20. var delayedQuotaUpdater quotaUpdater
  21. func init() {
  22. delayedQuotaUpdater = newQuotaUpdater()
  23. }
  24. type quotaObject struct {
  25. size int64
  26. files int
  27. }
  28. type transferQuotaObject struct {
  29. ulSize int64
  30. dlSize int64
  31. }
  32. type quotaUpdater struct {
  33. paramsMutex sync.RWMutex
  34. waitTime time.Duration
  35. sync.RWMutex
  36. pendingUserQuotaUpdates map[string]quotaObject
  37. pendingFolderQuotaUpdates map[string]quotaObject
  38. pendingTransferQuotaUpdates map[string]transferQuotaObject
  39. }
  40. func newQuotaUpdater() quotaUpdater {
  41. return quotaUpdater{
  42. pendingUserQuotaUpdates: make(map[string]quotaObject),
  43. pendingFolderQuotaUpdates: make(map[string]quotaObject),
  44. pendingTransferQuotaUpdates: make(map[string]transferQuotaObject),
  45. }
  46. }
  47. func (q *quotaUpdater) start() {
  48. q.setWaitTime(config.DelayedQuotaUpdate)
  49. go q.loop()
  50. }
  51. func (q *quotaUpdater) loop() {
  52. waitTime := q.getWaitTime()
  53. providerLog(logger.LevelDebug, "delayed quota update loop started, wait time: %v", waitTime)
  54. for waitTime > 0 {
  55. // We do this with a time.Sleep instead of a time.Ticker because we don't know
  56. // how long each quota processing cycle will take, and we want to make
  57. // sure we wait the configured seconds between each iteration
  58. time.Sleep(waitTime)
  59. providerLog(logger.LevelDebug, "delayed quota update check start")
  60. q.storeUsersQuota()
  61. q.storeFoldersQuota()
  62. q.storeUsersTransferQuota()
  63. providerLog(logger.LevelDebug, "delayed quota update check end")
  64. waitTime = q.getWaitTime()
  65. }
  66. providerLog(logger.LevelDebug, "delayed quota update loop ended, wait time: %v", waitTime)
  67. }
  68. func (q *quotaUpdater) setWaitTime(secs int) {
  69. q.paramsMutex.Lock()
  70. defer q.paramsMutex.Unlock()
  71. q.waitTime = time.Duration(secs) * time.Second
  72. }
  73. func (q *quotaUpdater) getWaitTime() time.Duration {
  74. q.paramsMutex.RLock()
  75. defer q.paramsMutex.RUnlock()
  76. return q.waitTime
  77. }
  78. func (q *quotaUpdater) resetUserQuota(username string) {
  79. q.Lock()
  80. defer q.Unlock()
  81. delete(q.pendingUserQuotaUpdates, username)
  82. }
  83. func (q *quotaUpdater) updateUserQuota(username string, files int, size int64) {
  84. q.Lock()
  85. defer q.Unlock()
  86. obj := q.pendingUserQuotaUpdates[username]
  87. obj.size += size
  88. obj.files += files
  89. if obj.files == 0 && obj.size == 0 {
  90. delete(q.pendingUserQuotaUpdates, username)
  91. return
  92. }
  93. q.pendingUserQuotaUpdates[username] = obj
  94. }
  95. func (q *quotaUpdater) getUserPendingQuota(username string) (int, int64) {
  96. q.RLock()
  97. defer q.RUnlock()
  98. obj := q.pendingUserQuotaUpdates[username]
  99. return obj.files, obj.size
  100. }
  101. func (q *quotaUpdater) resetFolderQuota(name string) {
  102. q.Lock()
  103. defer q.Unlock()
  104. delete(q.pendingFolderQuotaUpdates, name)
  105. }
  106. func (q *quotaUpdater) updateFolderQuota(name string, files int, size int64) {
  107. q.Lock()
  108. defer q.Unlock()
  109. obj := q.pendingFolderQuotaUpdates[name]
  110. obj.size += size
  111. obj.files += files
  112. if obj.files == 0 && obj.size == 0 {
  113. delete(q.pendingFolderQuotaUpdates, name)
  114. return
  115. }
  116. q.pendingFolderQuotaUpdates[name] = obj
  117. }
  118. func (q *quotaUpdater) getFolderPendingQuota(name string) (int, int64) {
  119. q.RLock()
  120. defer q.RUnlock()
  121. obj := q.pendingFolderQuotaUpdates[name]
  122. return obj.files, obj.size
  123. }
  124. func (q *quotaUpdater) resetUserTransferQuota(username string) {
  125. q.Lock()
  126. defer q.Unlock()
  127. delete(q.pendingTransferQuotaUpdates, username)
  128. }
  129. func (q *quotaUpdater) updateUserTransferQuota(username string, ulSize, dlSize int64) {
  130. q.Lock()
  131. defer q.Unlock()
  132. obj := q.pendingTransferQuotaUpdates[username]
  133. obj.ulSize += ulSize
  134. obj.dlSize += dlSize
  135. if obj.ulSize == 0 && obj.dlSize == 0 {
  136. delete(q.pendingTransferQuotaUpdates, username)
  137. return
  138. }
  139. q.pendingTransferQuotaUpdates[username] = obj
  140. }
  141. func (q *quotaUpdater) getUserPendingTransferQuota(username string) (int64, int64) {
  142. q.RLock()
  143. defer q.RUnlock()
  144. obj := q.pendingTransferQuotaUpdates[username]
  145. return obj.ulSize, obj.dlSize
  146. }
  147. func (q *quotaUpdater) getUsernames() []string {
  148. q.RLock()
  149. defer q.RUnlock()
  150. result := make([]string, 0, len(q.pendingUserQuotaUpdates))
  151. for username := range q.pendingUserQuotaUpdates {
  152. result = append(result, username)
  153. }
  154. return result
  155. }
  156. func (q *quotaUpdater) getFoldernames() []string {
  157. q.RLock()
  158. defer q.RUnlock()
  159. result := make([]string, 0, len(q.pendingFolderQuotaUpdates))
  160. for name := range q.pendingFolderQuotaUpdates {
  161. result = append(result, name)
  162. }
  163. return result
  164. }
  165. func (q *quotaUpdater) getTransferQuotaUsernames() []string {
  166. q.RLock()
  167. defer q.RUnlock()
  168. result := make([]string, 0, len(q.pendingTransferQuotaUpdates))
  169. for username := range q.pendingTransferQuotaUpdates {
  170. result = append(result, username)
  171. }
  172. return result
  173. }
  174. func (q *quotaUpdater) storeUsersQuota() {
  175. for _, username := range q.getUsernames() {
  176. files, size := q.getUserPendingQuota(username)
  177. if size != 0 || files != 0 {
  178. err := provider.updateQuota(username, files, size, false)
  179. if err != nil {
  180. providerLog(logger.LevelWarn, "unable to update quota delayed for user %#v: %v", username, err)
  181. continue
  182. }
  183. q.updateUserQuota(username, -files, -size)
  184. }
  185. }
  186. }
  187. func (q *quotaUpdater) storeFoldersQuota() {
  188. for _, name := range q.getFoldernames() {
  189. files, size := q.getFolderPendingQuota(name)
  190. if size != 0 || files != 0 {
  191. err := provider.updateFolderQuota(name, files, size, false)
  192. if err != nil {
  193. providerLog(logger.LevelWarn, "unable to update quota delayed for folder %#v: %v", name, err)
  194. continue
  195. }
  196. q.updateFolderQuota(name, -files, -size)
  197. }
  198. }
  199. }
  200. func (q *quotaUpdater) storeUsersTransferQuota() {
  201. for _, username := range q.getTransferQuotaUsernames() {
  202. ulSize, dlSize := q.getUserPendingTransferQuota(username)
  203. if ulSize != 0 || dlSize != 0 {
  204. err := provider.updateTransferQuota(username, ulSize, dlSize, false)
  205. if err != nil {
  206. providerLog(logger.LevelWarn, "unable to update transfer quota delayed for user %#v: %v", username, err)
  207. continue
  208. }
  209. q.updateUserTransferQuota(username, -ulSize, -dlSize)
  210. }
  211. }
  212. }