transferschecker_test.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "fmt"
  17. "os"
  18. "path"
  19. "path/filepath"
  20. "strconv"
  21. "strings"
  22. "testing"
  23. "time"
  24. "github.com/rs/xid"
  25. "github.com/sftpgo/sdk"
  26. "github.com/stretchr/testify/assert"
  27. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  28. "github.com/drakkan/sftpgo/v2/internal/util"
  29. "github.com/drakkan/sftpgo/v2/internal/vfs"
  30. )
  31. func TestTransfersCheckerDiskQuota(t *testing.T) {
  32. username := "transfers_check_username"
  33. folderName := "test_transfers_folder"
  34. groupName := "test_transfers_group"
  35. vdirPath := "/vdir"
  36. group := dataprovider.Group{
  37. BaseGroup: sdk.BaseGroup{
  38. Name: groupName,
  39. },
  40. UserSettings: dataprovider.GroupUserSettings{
  41. BaseGroupUserSettings: sdk.BaseGroupUserSettings{
  42. QuotaSize: 120,
  43. },
  44. },
  45. }
  46. folder := vfs.BaseVirtualFolder{
  47. Name: folderName,
  48. MappedPath: filepath.Join(os.TempDir(), folderName),
  49. }
  50. user := dataprovider.User{
  51. BaseUser: sdk.BaseUser{
  52. Username: username,
  53. Password: "testpwd",
  54. HomeDir: filepath.Join(os.TempDir(), username),
  55. Status: 1,
  56. QuotaSize: 0, // the quota size defined for the group is used
  57. Permissions: map[string][]string{
  58. "/": {dataprovider.PermAny},
  59. },
  60. },
  61. VirtualFolders: []vfs.VirtualFolder{
  62. {
  63. BaseVirtualFolder: vfs.BaseVirtualFolder{
  64. Name: folderName,
  65. },
  66. VirtualPath: vdirPath,
  67. QuotaSize: 100,
  68. },
  69. },
  70. Groups: []sdk.GroupMapping{
  71. {
  72. Name: groupName,
  73. Type: sdk.GroupTypePrimary,
  74. },
  75. },
  76. }
  77. err := dataprovider.AddGroup(&group, "", "", "")
  78. assert.NoError(t, err)
  79. group, err = dataprovider.GroupExists(groupName)
  80. assert.NoError(t, err)
  81. err = dataprovider.AddFolder(&folder, "", "", "")
  82. assert.NoError(t, err)
  83. assert.Equal(t, int64(120), group.UserSettings.QuotaSize)
  84. err = dataprovider.AddUser(&user, "", "", "")
  85. assert.NoError(t, err)
  86. user, err = dataprovider.GetUserWithGroupSettings(username, "")
  87. assert.NoError(t, err)
  88. connID1 := xid.New().String()
  89. fsUser, err := user.GetFilesystemForPath("/file1", connID1)
  90. assert.NoError(t, err)
  91. conn1 := NewBaseConnection(connID1, ProtocolSFTP, "", "", user)
  92. fakeConn1 := &fakeConnection{
  93. BaseConnection: conn1,
  94. }
  95. transfer1 := NewBaseTransfer(nil, conn1, nil, filepath.Join(user.HomeDir, "file1"), filepath.Join(user.HomeDir, "file1"),
  96. "/file1", TransferUpload, 0, 0, 120, 0, true, fsUser, dataprovider.TransferQuota{})
  97. transfer1.BytesReceived.Store(150)
  98. err = Connections.Add(fakeConn1)
  99. assert.NoError(t, err)
  100. // the transferschecker will do nothing if there is only one ongoing transfer
  101. Connections.checkTransfers()
  102. assert.Nil(t, transfer1.errAbort)
  103. connID2 := xid.New().String()
  104. conn2 := NewBaseConnection(connID2, ProtocolSFTP, "", "", user)
  105. fakeConn2 := &fakeConnection{
  106. BaseConnection: conn2,
  107. }
  108. transfer2 := NewBaseTransfer(nil, conn2, nil, filepath.Join(user.HomeDir, "file2"), filepath.Join(user.HomeDir, "file2"),
  109. "/file2", TransferUpload, 0, 0, 120, 40, true, fsUser, dataprovider.TransferQuota{})
  110. transfer1.BytesReceived.Store(50)
  111. transfer2.BytesReceived.Store(60)
  112. err = Connections.Add(fakeConn2)
  113. assert.NoError(t, err)
  114. connID3 := xid.New().String()
  115. conn3 := NewBaseConnection(connID3, ProtocolSFTP, "", "", user)
  116. fakeConn3 := &fakeConnection{
  117. BaseConnection: conn3,
  118. }
  119. transfer3 := NewBaseTransfer(nil, conn3, nil, filepath.Join(user.HomeDir, "file3"), filepath.Join(user.HomeDir, "file3"),
  120. "/file3", TransferDownload, 0, 0, 120, 0, true, fsUser, dataprovider.TransferQuota{})
  121. transfer3.BytesReceived.Store(60) // this value will be ignored, this is a download
  122. err = Connections.Add(fakeConn3)
  123. assert.NoError(t, err)
  124. // the transfers are not overquota
  125. Connections.checkTransfers()
  126. assert.Nil(t, transfer1.errAbort)
  127. assert.Nil(t, transfer2.errAbort)
  128. assert.Nil(t, transfer3.errAbort)
  129. transfer1.BytesReceived.Store(80) // truncated size will be subtracted, we are not overquota
  130. Connections.checkTransfers()
  131. assert.Nil(t, transfer1.errAbort)
  132. assert.Nil(t, transfer2.errAbort)
  133. assert.Nil(t, transfer3.errAbort)
  134. transfer1.BytesReceived.Store(120)
  135. // we are now overquota
  136. // if another check is in progress nothing is done
  137. Connections.transfersCheckStatus.Store(true)
  138. Connections.checkTransfers()
  139. assert.Nil(t, transfer1.errAbort)
  140. assert.Nil(t, transfer2.errAbort)
  141. assert.Nil(t, transfer3.errAbort)
  142. Connections.transfersCheckStatus.Store(false)
  143. Connections.checkTransfers()
  144. assert.True(t, conn1.IsQuotaExceededError(transfer1.errAbort), transfer1.errAbort)
  145. assert.True(t, conn2.IsQuotaExceededError(transfer2.errAbort), transfer2.errAbort)
  146. assert.True(t, conn1.IsQuotaExceededError(transfer1.GetAbortError()))
  147. assert.Nil(t, transfer3.errAbort)
  148. assert.True(t, conn3.IsQuotaExceededError(transfer3.GetAbortError()))
  149. // update the user quota size
  150. group.UserSettings.QuotaSize = 1000
  151. err = dataprovider.UpdateGroup(&group, []string{username}, "", "", "")
  152. assert.NoError(t, err)
  153. transfer1.errAbort = nil
  154. transfer2.errAbort = nil
  155. Connections.checkTransfers()
  156. assert.Nil(t, transfer1.errAbort)
  157. assert.Nil(t, transfer2.errAbort)
  158. assert.Nil(t, transfer3.errAbort)
  159. group.UserSettings.QuotaSize = 0
  160. err = dataprovider.UpdateGroup(&group, []string{username}, "", "", "")
  161. assert.NoError(t, err)
  162. Connections.checkTransfers()
  163. assert.Nil(t, transfer1.errAbort)
  164. assert.Nil(t, transfer2.errAbort)
  165. assert.Nil(t, transfer3.errAbort)
  166. // now check a public folder
  167. transfer1.BytesReceived.Store(0)
  168. transfer2.BytesReceived.Store(0)
  169. connID4 := xid.New().String()
  170. fsFolder, err := user.GetFilesystemForPath(path.Join(vdirPath, "/file1"), connID4)
  171. assert.NoError(t, err)
  172. conn4 := NewBaseConnection(connID4, ProtocolSFTP, "", "", user)
  173. fakeConn4 := &fakeConnection{
  174. BaseConnection: conn4,
  175. }
  176. transfer4 := NewBaseTransfer(nil, conn4, nil, filepath.Join(os.TempDir(), folderName, "file1"),
  177. filepath.Join(os.TempDir(), folderName, "file1"), path.Join(vdirPath, "/file1"), TransferUpload, 0, 0,
  178. 100, 0, true, fsFolder, dataprovider.TransferQuota{})
  179. err = Connections.Add(fakeConn4)
  180. assert.NoError(t, err)
  181. connID5 := xid.New().String()
  182. conn5 := NewBaseConnection(connID5, ProtocolSFTP, "", "", user)
  183. fakeConn5 := &fakeConnection{
  184. BaseConnection: conn5,
  185. }
  186. transfer5 := NewBaseTransfer(nil, conn5, nil, filepath.Join(os.TempDir(), folderName, "file2"),
  187. filepath.Join(os.TempDir(), folderName, "file2"), path.Join(vdirPath, "/file2"), TransferUpload, 0, 0,
  188. 100, 0, true, fsFolder, dataprovider.TransferQuota{})
  189. err = Connections.Add(fakeConn5)
  190. assert.NoError(t, err)
  191. transfer4.BytesReceived.Store(50)
  192. transfer5.BytesReceived.Store(40)
  193. Connections.checkTransfers()
  194. assert.Nil(t, transfer4.errAbort)
  195. assert.Nil(t, transfer5.errAbort)
  196. transfer5.BytesReceived.Store(60)
  197. Connections.checkTransfers()
  198. assert.Nil(t, transfer1.errAbort)
  199. assert.Nil(t, transfer2.errAbort)
  200. assert.Nil(t, transfer3.errAbort)
  201. assert.True(t, conn1.IsQuotaExceededError(transfer4.errAbort))
  202. assert.True(t, conn2.IsQuotaExceededError(transfer5.errAbort))
  203. if dataprovider.GetProviderStatus().Driver != dataprovider.MemoryDataProviderName {
  204. providerConf := dataprovider.GetProviderConfig()
  205. err = dataprovider.Close()
  206. assert.NoError(t, err)
  207. transfer4.errAbort = nil
  208. transfer5.errAbort = nil
  209. Connections.checkTransfers()
  210. assert.Nil(t, transfer1.errAbort)
  211. assert.Nil(t, transfer2.errAbort)
  212. assert.Nil(t, transfer3.errAbort)
  213. assert.Nil(t, transfer4.errAbort)
  214. assert.Nil(t, transfer5.errAbort)
  215. err = dataprovider.Initialize(providerConf, configDir, true)
  216. assert.NoError(t, err)
  217. }
  218. err = transfer1.Close()
  219. assert.NoError(t, err)
  220. err = transfer2.Close()
  221. assert.NoError(t, err)
  222. err = transfer3.Close()
  223. assert.NoError(t, err)
  224. err = transfer4.Close()
  225. assert.NoError(t, err)
  226. err = transfer5.Close()
  227. assert.NoError(t, err)
  228. Connections.Remove(fakeConn1.GetID())
  229. Connections.Remove(fakeConn2.GetID())
  230. Connections.Remove(fakeConn3.GetID())
  231. Connections.Remove(fakeConn4.GetID())
  232. Connections.Remove(fakeConn5.GetID())
  233. stats := Connections.GetStats("")
  234. assert.Len(t, stats, 0)
  235. assert.Equal(t, int32(0), Connections.GetTotalTransfers())
  236. err = dataprovider.DeleteUser(user.Username, "", "", "")
  237. assert.NoError(t, err)
  238. err = os.RemoveAll(user.GetHomeDir())
  239. assert.NoError(t, err)
  240. err = dataprovider.DeleteFolder(folderName, "", "", "")
  241. assert.NoError(t, err)
  242. err = os.RemoveAll(filepath.Join(os.TempDir(), folderName))
  243. assert.NoError(t, err)
  244. err = dataprovider.DeleteGroup(groupName, "", "", "")
  245. assert.NoError(t, err)
  246. }
  247. func TestTransferCheckerTransferQuota(t *testing.T) {
  248. username := "transfers_check_username"
  249. user := dataprovider.User{
  250. BaseUser: sdk.BaseUser{
  251. Username: username,
  252. Password: "test_pwd",
  253. HomeDir: filepath.Join(os.TempDir(), username),
  254. Status: 1,
  255. TotalDataTransfer: 1,
  256. Permissions: map[string][]string{
  257. "/": {dataprovider.PermAny},
  258. },
  259. },
  260. }
  261. err := dataprovider.AddUser(&user, "", "", "")
  262. assert.NoError(t, err)
  263. connID1 := xid.New().String()
  264. fsUser, err := user.GetFilesystemForPath("/file1", connID1)
  265. assert.NoError(t, err)
  266. conn1 := NewBaseConnection(connID1, ProtocolSFTP, "", "192.168.1.1", user)
  267. fakeConn1 := &fakeConnection{
  268. BaseConnection: conn1,
  269. }
  270. transfer1 := NewBaseTransfer(nil, conn1, nil, filepath.Join(user.HomeDir, "file1"), filepath.Join(user.HomeDir, "file1"),
  271. "/file1", TransferUpload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedTotalSize: 100})
  272. transfer1.BytesReceived.Store(150)
  273. err = Connections.Add(fakeConn1)
  274. assert.NoError(t, err)
  275. // the transferschecker will do nothing if there is only one ongoing transfer
  276. Connections.checkTransfers()
  277. assert.Nil(t, transfer1.errAbort)
  278. connID2 := xid.New().String()
  279. conn2 := NewBaseConnection(connID2, ProtocolSFTP, "", "127.0.0.1", user)
  280. fakeConn2 := &fakeConnection{
  281. BaseConnection: conn2,
  282. }
  283. transfer2 := NewBaseTransfer(nil, conn2, nil, filepath.Join(user.HomeDir, "file2"), filepath.Join(user.HomeDir, "file2"),
  284. "/file2", TransferUpload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedTotalSize: 100})
  285. transfer2.BytesReceived.Store(150)
  286. err = Connections.Add(fakeConn2)
  287. assert.NoError(t, err)
  288. Connections.checkTransfers()
  289. assert.Nil(t, transfer1.errAbort)
  290. assert.Nil(t, transfer2.errAbort)
  291. // now test overquota
  292. transfer1.BytesReceived.Store(1024*1024 + 1)
  293. transfer2.BytesReceived.Store(0)
  294. Connections.checkTransfers()
  295. assert.True(t, conn1.IsQuotaExceededError(transfer1.errAbort), transfer1.errAbort)
  296. assert.Nil(t, transfer2.errAbort)
  297. transfer1.errAbort = nil
  298. transfer1.BytesReceived.Store(1024*1024 + 1)
  299. transfer2.BytesReceived.Store(1024)
  300. Connections.checkTransfers()
  301. assert.True(t, conn1.IsQuotaExceededError(transfer1.errAbort))
  302. assert.True(t, conn2.IsQuotaExceededError(transfer2.errAbort))
  303. transfer1.BytesReceived.Store(0)
  304. transfer2.BytesReceived.Store(0)
  305. transfer1.errAbort = nil
  306. transfer2.errAbort = nil
  307. err = transfer1.Close()
  308. assert.NoError(t, err)
  309. err = transfer2.Close()
  310. assert.NoError(t, err)
  311. Connections.Remove(fakeConn1.GetID())
  312. Connections.Remove(fakeConn2.GetID())
  313. connID3 := xid.New().String()
  314. conn3 := NewBaseConnection(connID3, ProtocolSFTP, "", "", user)
  315. fakeConn3 := &fakeConnection{
  316. BaseConnection: conn3,
  317. }
  318. transfer3 := NewBaseTransfer(nil, conn3, nil, filepath.Join(user.HomeDir, "file1"), filepath.Join(user.HomeDir, "file1"),
  319. "/file1", TransferDownload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedDLSize: 100})
  320. transfer3.BytesSent.Store(150)
  321. err = Connections.Add(fakeConn3)
  322. assert.NoError(t, err)
  323. connID4 := xid.New().String()
  324. conn4 := NewBaseConnection(connID4, ProtocolSFTP, "", "", user)
  325. fakeConn4 := &fakeConnection{
  326. BaseConnection: conn4,
  327. }
  328. transfer4 := NewBaseTransfer(nil, conn4, nil, filepath.Join(user.HomeDir, "file2"), filepath.Join(user.HomeDir, "file2"),
  329. "/file2", TransferDownload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedDLSize: 100})
  330. transfer4.BytesSent.Store(150)
  331. err = Connections.Add(fakeConn4)
  332. assert.NoError(t, err)
  333. Connections.checkTransfers()
  334. assert.Nil(t, transfer3.errAbort)
  335. assert.Nil(t, transfer4.errAbort)
  336. transfer3.BytesSent.Store(512 * 1024)
  337. transfer4.BytesSent.Store(512*1024 + 1)
  338. Connections.checkTransfers()
  339. if assert.Error(t, transfer3.errAbort) {
  340. assert.Contains(t, transfer3.errAbort.Error(), ErrReadQuotaExceeded.Error())
  341. }
  342. if assert.Error(t, transfer4.errAbort) {
  343. assert.Contains(t, transfer4.errAbort.Error(), ErrReadQuotaExceeded.Error())
  344. }
  345. err = transfer3.Close()
  346. assert.NoError(t, err)
  347. err = transfer4.Close()
  348. assert.NoError(t, err)
  349. Connections.Remove(fakeConn3.GetID())
  350. Connections.Remove(fakeConn4.GetID())
  351. stats := Connections.GetStats("")
  352. assert.Len(t, stats, 0)
  353. assert.Equal(t, int32(0), Connections.GetTotalTransfers())
  354. err = dataprovider.DeleteUser(user.Username, "", "", "")
  355. assert.NoError(t, err)
  356. err = os.RemoveAll(user.GetHomeDir())
  357. assert.NoError(t, err)
  358. }
  359. func TestAggregateTransfers(t *testing.T) {
  360. checker := transfersCheckerMem{}
  361. checker.AddTransfer(dataprovider.ActiveTransfer{
  362. ID: 1,
  363. Type: TransferUpload,
  364. ConnID: "1",
  365. Username: "user",
  366. FolderName: "",
  367. TruncatedSize: 0,
  368. CurrentULSize: 100,
  369. CurrentDLSize: 0,
  370. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  371. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  372. })
  373. usersToFetch, aggregations := checker.aggregateUploadTransfers()
  374. assert.Len(t, usersToFetch, 0)
  375. assert.Len(t, aggregations, 1)
  376. checker.AddTransfer(dataprovider.ActiveTransfer{
  377. ID: 1,
  378. Type: TransferDownload,
  379. ConnID: "2",
  380. Username: "user",
  381. FolderName: "",
  382. TruncatedSize: 0,
  383. CurrentULSize: 0,
  384. CurrentDLSize: 100,
  385. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  386. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  387. })
  388. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  389. assert.Len(t, usersToFetch, 0)
  390. assert.Len(t, aggregations, 1)
  391. checker.AddTransfer(dataprovider.ActiveTransfer{
  392. ID: 1,
  393. Type: TransferUpload,
  394. ConnID: "3",
  395. Username: "user",
  396. FolderName: "folder",
  397. TruncatedSize: 0,
  398. CurrentULSize: 10,
  399. CurrentDLSize: 0,
  400. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  401. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  402. })
  403. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  404. assert.Len(t, usersToFetch, 0)
  405. assert.Len(t, aggregations, 2)
  406. checker.AddTransfer(dataprovider.ActiveTransfer{
  407. ID: 1,
  408. Type: TransferUpload,
  409. ConnID: "4",
  410. Username: "user1",
  411. FolderName: "",
  412. TruncatedSize: 0,
  413. CurrentULSize: 100,
  414. CurrentDLSize: 0,
  415. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  416. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  417. })
  418. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  419. assert.Len(t, usersToFetch, 0)
  420. assert.Len(t, aggregations, 3)
  421. checker.AddTransfer(dataprovider.ActiveTransfer{
  422. ID: 1,
  423. Type: TransferUpload,
  424. ConnID: "5",
  425. Username: "user",
  426. FolderName: "",
  427. TruncatedSize: 0,
  428. CurrentULSize: 100,
  429. CurrentDLSize: 0,
  430. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  431. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  432. })
  433. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  434. assert.Len(t, usersToFetch, 1)
  435. val, ok := usersToFetch["user"]
  436. assert.True(t, ok)
  437. assert.False(t, val)
  438. assert.Len(t, aggregations, 3)
  439. aggregate, ok := aggregations[0]
  440. assert.True(t, ok)
  441. assert.Len(t, aggregate, 2)
  442. checker.AddTransfer(dataprovider.ActiveTransfer{
  443. ID: 1,
  444. Type: TransferUpload,
  445. ConnID: "6",
  446. Username: "user",
  447. FolderName: "",
  448. TruncatedSize: 0,
  449. CurrentULSize: 100,
  450. CurrentDLSize: 0,
  451. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  452. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  453. })
  454. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  455. assert.Len(t, usersToFetch, 1)
  456. val, ok = usersToFetch["user"]
  457. assert.True(t, ok)
  458. assert.False(t, val)
  459. assert.Len(t, aggregations, 3)
  460. aggregate, ok = aggregations[0]
  461. assert.True(t, ok)
  462. assert.Len(t, aggregate, 3)
  463. checker.AddTransfer(dataprovider.ActiveTransfer{
  464. ID: 1,
  465. Type: TransferUpload,
  466. ConnID: "7",
  467. Username: "user",
  468. FolderName: "folder",
  469. TruncatedSize: 0,
  470. CurrentULSize: 10,
  471. CurrentDLSize: 0,
  472. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  473. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  474. })
  475. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  476. assert.Len(t, usersToFetch, 1)
  477. val, ok = usersToFetch["user"]
  478. assert.True(t, ok)
  479. assert.True(t, val)
  480. assert.Len(t, aggregations, 3)
  481. aggregate, ok = aggregations[0]
  482. assert.True(t, ok)
  483. assert.Len(t, aggregate, 3)
  484. aggregate, ok = aggregations[1]
  485. assert.True(t, ok)
  486. assert.Len(t, aggregate, 2)
  487. checker.AddTransfer(dataprovider.ActiveTransfer{
  488. ID: 1,
  489. Type: TransferUpload,
  490. ConnID: "8",
  491. Username: "user",
  492. FolderName: "",
  493. TruncatedSize: 0,
  494. CurrentULSize: 100,
  495. CurrentDLSize: 0,
  496. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  497. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  498. })
  499. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  500. assert.Len(t, usersToFetch, 1)
  501. val, ok = usersToFetch["user"]
  502. assert.True(t, ok)
  503. assert.True(t, val)
  504. assert.Len(t, aggregations, 3)
  505. aggregate, ok = aggregations[0]
  506. assert.True(t, ok)
  507. assert.Len(t, aggregate, 4)
  508. aggregate, ok = aggregations[1]
  509. assert.True(t, ok)
  510. assert.Len(t, aggregate, 2)
  511. }
  512. func TestDataTransferExceeded(t *testing.T) {
  513. user := dataprovider.User{
  514. BaseUser: sdk.BaseUser{
  515. TotalDataTransfer: 1,
  516. },
  517. }
  518. transfer := dataprovider.ActiveTransfer{
  519. CurrentULSize: 0,
  520. CurrentDLSize: 0,
  521. }
  522. user.UsedDownloadDataTransfer = 1024 * 1024
  523. user.UsedUploadDataTransfer = 512 * 1024
  524. checker := transfersCheckerMem{}
  525. res := checker.isDataTransferExceeded(user, transfer, 100, 100)
  526. assert.False(t, res)
  527. transfer.CurrentULSize = 1
  528. res = checker.isDataTransferExceeded(user, transfer, 100, 100)
  529. assert.True(t, res)
  530. user.UsedDownloadDataTransfer = 512*1024 - 100
  531. user.UsedUploadDataTransfer = 512*1024 - 100
  532. res = checker.isDataTransferExceeded(user, transfer, 100, 100)
  533. assert.False(t, res)
  534. res = checker.isDataTransferExceeded(user, transfer, 101, 100)
  535. assert.True(t, res)
  536. user.TotalDataTransfer = 0
  537. user.DownloadDataTransfer = 1
  538. user.UsedDownloadDataTransfer = 512 * 1024
  539. transfer.CurrentULSize = 0
  540. transfer.CurrentDLSize = 100
  541. res = checker.isDataTransferExceeded(user, transfer, 0, 512*1024)
  542. assert.False(t, res)
  543. res = checker.isDataTransferExceeded(user, transfer, 0, 512*1024+1)
  544. assert.True(t, res)
  545. user.DownloadDataTransfer = 0
  546. user.UploadDataTransfer = 1
  547. user.UsedUploadDataTransfer = 512 * 1024
  548. transfer.CurrentULSize = 0
  549. transfer.CurrentDLSize = 0
  550. res = checker.isDataTransferExceeded(user, transfer, 512*1024+1, 0)
  551. assert.False(t, res)
  552. transfer.CurrentULSize = 1
  553. res = checker.isDataTransferExceeded(user, transfer, 512*1024+1, 0)
  554. assert.True(t, res)
  555. }
  556. func TestGetUsersForQuotaCheck(t *testing.T) {
  557. usersToFetch := make(map[string]bool)
  558. for i := 0; i < 70; i++ {
  559. usersToFetch[fmt.Sprintf("user%v", i)] = i%2 == 0
  560. }
  561. users, err := dataprovider.GetUsersForQuotaCheck(usersToFetch)
  562. assert.NoError(t, err)
  563. assert.Len(t, users, 0)
  564. for i := 0; i < 60; i++ {
  565. folder := vfs.BaseVirtualFolder{
  566. Name: fmt.Sprintf("f%v", i),
  567. MappedPath: filepath.Join(os.TempDir(), fmt.Sprintf("f%v", i)),
  568. }
  569. user := dataprovider.User{
  570. BaseUser: sdk.BaseUser{
  571. Username: fmt.Sprintf("user%v", i),
  572. Password: "pwd",
  573. HomeDir: filepath.Join(os.TempDir(), fmt.Sprintf("user%v", i)),
  574. Status: 1,
  575. QuotaSize: 120,
  576. Permissions: map[string][]string{
  577. "/": {dataprovider.PermAny},
  578. },
  579. },
  580. VirtualFolders: []vfs.VirtualFolder{
  581. {
  582. BaseVirtualFolder: vfs.BaseVirtualFolder{
  583. Name: folder.Name,
  584. },
  585. VirtualPath: "/vfolder",
  586. QuotaSize: 100,
  587. },
  588. },
  589. }
  590. err = dataprovider.AddFolder(&folder, "", "", "")
  591. assert.NoError(t, err)
  592. err = dataprovider.AddUser(&user, "", "", "")
  593. assert.NoError(t, err)
  594. err = dataprovider.UpdateVirtualFolderQuota(&vfs.BaseVirtualFolder{Name: fmt.Sprintf("f%v", i)}, 1, 50, false)
  595. assert.NoError(t, err)
  596. }
  597. users, err = dataprovider.GetUsersForQuotaCheck(usersToFetch)
  598. assert.NoError(t, err)
  599. assert.Len(t, users, 60)
  600. for _, user := range users {
  601. userIdxStr := strings.Replace(user.Username, "user", "", 1)
  602. userIdx, err := strconv.Atoi(userIdxStr)
  603. assert.NoError(t, err)
  604. if userIdx%2 == 0 {
  605. if assert.Len(t, user.VirtualFolders, 1, user.Username) {
  606. assert.Equal(t, int64(100), user.VirtualFolders[0].QuotaSize)
  607. assert.Equal(t, int64(50), user.VirtualFolders[0].UsedQuotaSize)
  608. }
  609. } else {
  610. switch dataprovider.GetProviderStatus().Driver {
  611. case dataprovider.MySQLDataProviderName, dataprovider.PGSQLDataProviderName,
  612. dataprovider.CockroachDataProviderName, dataprovider.SQLiteDataProviderName:
  613. assert.Len(t, user.VirtualFolders, 0, user.Username)
  614. }
  615. }
  616. ul, dl, total := user.GetDataTransferLimits()
  617. assert.Equal(t, int64(0), ul)
  618. assert.Equal(t, int64(0), dl)
  619. assert.Equal(t, int64(0), total)
  620. }
  621. for i := 0; i < 60; i++ {
  622. err = dataprovider.DeleteUser(fmt.Sprintf("user%v", i), "", "", "")
  623. assert.NoError(t, err)
  624. err = dataprovider.DeleteFolder(fmt.Sprintf("f%v", i), "", "", "")
  625. assert.NoError(t, err)
  626. }
  627. users, err = dataprovider.GetUsersForQuotaCheck(usersToFetch)
  628. assert.NoError(t, err)
  629. assert.Len(t, users, 0)
  630. }
  631. func TestDBTransferChecker(t *testing.T) {
  632. if !isDbTransferCheckerSupported() {
  633. t.Skip("this test is not supported with the current database provider")
  634. }
  635. providerConf := dataprovider.GetProviderConfig()
  636. err := dataprovider.Close()
  637. assert.NoError(t, err)
  638. providerConf.IsShared = 1
  639. err = dataprovider.Initialize(providerConf, configDir, true)
  640. assert.NoError(t, err)
  641. c := getTransfersChecker(1)
  642. checker, ok := c.(*transfersCheckerDB)
  643. assert.True(t, ok)
  644. assert.True(t, checker.lastCleanup.IsZero())
  645. transfer1 := dataprovider.ActiveTransfer{
  646. ID: 1,
  647. Type: TransferDownload,
  648. ConnID: xid.New().String(),
  649. Username: "user1",
  650. FolderName: "folder1",
  651. IP: "127.0.0.1",
  652. }
  653. checker.AddTransfer(transfer1)
  654. transfers, err := dataprovider.GetActiveTransfers(time.Now().Add(24 * time.Hour))
  655. assert.NoError(t, err)
  656. assert.Len(t, transfers, 0)
  657. transfers, err = dataprovider.GetActiveTransfers(time.Now().Add(-periodicTimeoutCheckInterval * 2))
  658. assert.NoError(t, err)
  659. var createdAt, updatedAt int64
  660. if assert.Len(t, transfers, 1) {
  661. transfer := transfers[0]
  662. assert.Equal(t, transfer1.ID, transfer.ID)
  663. assert.Equal(t, transfer1.Type, transfer.Type)
  664. assert.Equal(t, transfer1.ConnID, transfer.ConnID)
  665. assert.Equal(t, transfer1.Username, transfer.Username)
  666. assert.Equal(t, transfer1.IP, transfer.IP)
  667. assert.Equal(t, transfer1.FolderName, transfer.FolderName)
  668. assert.Greater(t, transfer.CreatedAt, int64(0))
  669. assert.Greater(t, transfer.UpdatedAt, int64(0))
  670. assert.Equal(t, int64(0), transfer.CurrentDLSize)
  671. assert.Equal(t, int64(0), transfer.CurrentULSize)
  672. createdAt = transfer.CreatedAt
  673. updatedAt = transfer.UpdatedAt
  674. }
  675. time.Sleep(100 * time.Millisecond)
  676. checker.UpdateTransferCurrentSizes(100, 150, transfer1.ID, transfer1.ConnID)
  677. transfers, err = dataprovider.GetActiveTransfers(time.Now().Add(-periodicTimeoutCheckInterval * 2))
  678. assert.NoError(t, err)
  679. if assert.Len(t, transfers, 1) {
  680. transfer := transfers[0]
  681. assert.Equal(t, int64(150), transfer.CurrentDLSize)
  682. assert.Equal(t, int64(100), transfer.CurrentULSize)
  683. assert.Equal(t, createdAt, transfer.CreatedAt)
  684. assert.Greater(t, transfer.UpdatedAt, updatedAt)
  685. }
  686. res := checker.GetOverquotaTransfers()
  687. assert.Len(t, res, 0)
  688. checker.RemoveTransfer(transfer1.ID, transfer1.ConnID)
  689. transfers, err = dataprovider.GetActiveTransfers(time.Now().Add(-periodicTimeoutCheckInterval * 2))
  690. assert.NoError(t, err)
  691. assert.Len(t, transfers, 0)
  692. err = dataprovider.Close()
  693. assert.NoError(t, err)
  694. res = checker.GetOverquotaTransfers()
  695. assert.Len(t, res, 0)
  696. providerConf.IsShared = 0
  697. err = dataprovider.Initialize(providerConf, configDir, true)
  698. assert.NoError(t, err)
  699. }
  700. func isDbTransferCheckerSupported() bool {
  701. // SQLite shares the implementation with other SQL-based provider but it makes no sense
  702. // to use it outside test cases
  703. switch dataprovider.GetProviderStatus().Driver {
  704. case dataprovider.MySQLDataProviderName, dataprovider.PGSQLDataProviderName,
  705. dataprovider.CockroachDataProviderName, dataprovider.SQLiteDataProviderName:
  706. return true
  707. default:
  708. return false
  709. }
  710. }