transferschecker_test.go 25 KB


  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "fmt"
  17. "os"
  18. "path"
  19. "path/filepath"
  20. "strconv"
  21. "strings"
  22. "testing"
  23. "time"
  24. "github.com/rs/xid"
  25. "github.com/sftpgo/sdk"
  26. "github.com/stretchr/testify/assert"
  27. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  28. "github.com/drakkan/sftpgo/v2/internal/util"
  29. "github.com/drakkan/sftpgo/v2/internal/vfs"
  30. )
  31. func TestTransfersCheckerDiskQuota(t *testing.T) {
  32. username := "transfers_check_username"
  33. folderName := "test_transfers_folder"
  34. groupName := "test_transfers_group"
  35. vdirPath := "/vdir"
  36. group := dataprovider.Group{
  37. BaseGroup: sdk.BaseGroup{
  38. Name: groupName,
  39. },
  40. UserSettings: dataprovider.GroupUserSettings{
  41. BaseGroupUserSettings: sdk.BaseGroupUserSettings{
  42. QuotaSize: 120,
  43. },
  44. },
  45. }
  46. user := dataprovider.User{
  47. BaseUser: sdk.BaseUser{
  48. Username: username,
  49. Password: "testpwd",
  50. HomeDir: filepath.Join(os.TempDir(), username),
  51. Status: 1,
  52. QuotaSize: 0, // the quota size defined for the group is used
  53. Permissions: map[string][]string{
  54. "/": {dataprovider.PermAny},
  55. },
  56. },
  57. VirtualFolders: []vfs.VirtualFolder{
  58. {
  59. BaseVirtualFolder: vfs.BaseVirtualFolder{
  60. Name: folderName,
  61. MappedPath: filepath.Join(os.TempDir(), folderName),
  62. },
  63. VirtualPath: vdirPath,
  64. QuotaSize: 100,
  65. },
  66. },
  67. Groups: []sdk.GroupMapping{
  68. {
  69. Name: groupName,
  70. Type: sdk.GroupTypePrimary,
  71. },
  72. },
  73. }
  74. err := dataprovider.AddGroup(&group, "", "")
  75. assert.NoError(t, err)
  76. group, err = dataprovider.GroupExists(groupName)
  77. assert.NoError(t, err)
  78. assert.Equal(t, int64(120), group.UserSettings.QuotaSize)
  79. err = dataprovider.AddUser(&user, "", "")
  80. assert.NoError(t, err)
  81. user, err = dataprovider.GetUserWithGroupSettings(username)
  82. assert.NoError(t, err)
  83. connID1 := xid.New().String()
  84. fsUser, err := user.GetFilesystemForPath("/file1", connID1)
  85. assert.NoError(t, err)
  86. conn1 := NewBaseConnection(connID1, ProtocolSFTP, "", "", user)
  87. fakeConn1 := &fakeConnection{
  88. BaseConnection: conn1,
  89. }
  90. transfer1 := NewBaseTransfer(nil, conn1, nil, filepath.Join(user.HomeDir, "file1"), filepath.Join(user.HomeDir, "file1"),
  91. "/file1", TransferUpload, 0, 0, 120, 0, true, fsUser, dataprovider.TransferQuota{})
  92. transfer1.BytesReceived.Store(150)
  93. err = Connections.Add(fakeConn1)
  94. assert.NoError(t, err)
  95. // the transferschecker will do nothing if there is only one ongoing transfer
  96. Connections.checkTransfers()
  97. assert.Nil(t, transfer1.errAbort)
  98. connID2 := xid.New().String()
  99. conn2 := NewBaseConnection(connID2, ProtocolSFTP, "", "", user)
  100. fakeConn2 := &fakeConnection{
  101. BaseConnection: conn2,
  102. }
  103. transfer2 := NewBaseTransfer(nil, conn2, nil, filepath.Join(user.HomeDir, "file2"), filepath.Join(user.HomeDir, "file2"),
  104. "/file2", TransferUpload, 0, 0, 120, 40, true, fsUser, dataprovider.TransferQuota{})
  105. transfer1.BytesReceived.Store(50)
  106. transfer2.BytesReceived.Store(60)
  107. err = Connections.Add(fakeConn2)
  108. assert.NoError(t, err)
  109. connID3 := xid.New().String()
  110. conn3 := NewBaseConnection(connID3, ProtocolSFTP, "", "", user)
  111. fakeConn3 := &fakeConnection{
  112. BaseConnection: conn3,
  113. }
  114. transfer3 := NewBaseTransfer(nil, conn3, nil, filepath.Join(user.HomeDir, "file3"), filepath.Join(user.HomeDir, "file3"),
  115. "/file3", TransferDownload, 0, 0, 120, 0, true, fsUser, dataprovider.TransferQuota{})
  116. transfer3.BytesReceived.Store(60) // this value will be ignored, this is a download
  117. err = Connections.Add(fakeConn3)
  118. assert.NoError(t, err)
  119. // the transfers are not overquota
  120. Connections.checkTransfers()
  121. assert.Nil(t, transfer1.errAbort)
  122. assert.Nil(t, transfer2.errAbort)
  123. assert.Nil(t, transfer3.errAbort)
  124. transfer1.BytesReceived.Store(80) // truncated size will be subtracted, we are not overquota
  125. Connections.checkTransfers()
  126. assert.Nil(t, transfer1.errAbort)
  127. assert.Nil(t, transfer2.errAbort)
  128. assert.Nil(t, transfer3.errAbort)
  129. transfer1.BytesReceived.Store(120)
  130. // we are now overquota
  131. // if another check is in progress nothing is done
  132. Connections.transfersCheckStatus.Store(true)
  133. Connections.checkTransfers()
  134. assert.Nil(t, transfer1.errAbort)
  135. assert.Nil(t, transfer2.errAbort)
  136. assert.Nil(t, transfer3.errAbort)
  137. Connections.transfersCheckStatus.Store(false)
  138. Connections.checkTransfers()
  139. assert.True(t, conn1.IsQuotaExceededError(transfer1.errAbort), transfer1.errAbort)
  140. assert.True(t, conn2.IsQuotaExceededError(transfer2.errAbort), transfer2.errAbort)
  141. assert.True(t, conn1.IsQuotaExceededError(transfer1.GetAbortError()))
  142. assert.Nil(t, transfer3.errAbort)
  143. assert.True(t, conn3.IsQuotaExceededError(transfer3.GetAbortError()))
  144. // update the user quota size
  145. group.UserSettings.QuotaSize = 1000
  146. err = dataprovider.UpdateGroup(&group, []string{username}, "", "")
  147. assert.NoError(t, err)
  148. transfer1.errAbort = nil
  149. transfer2.errAbort = nil
  150. Connections.checkTransfers()
  151. assert.Nil(t, transfer1.errAbort)
  152. assert.Nil(t, transfer2.errAbort)
  153. assert.Nil(t, transfer3.errAbort)
  154. group.UserSettings.QuotaSize = 0
  155. err = dataprovider.UpdateGroup(&group, []string{username}, "", "")
  156. assert.NoError(t, err)
  157. Connections.checkTransfers()
  158. assert.Nil(t, transfer1.errAbort)
  159. assert.Nil(t, transfer2.errAbort)
  160. assert.Nil(t, transfer3.errAbort)
  161. // now check a public folder
  162. transfer1.BytesReceived.Store(0)
  163. transfer2.BytesReceived.Store(0)
  164. connID4 := xid.New().String()
  165. fsFolder, err := user.GetFilesystemForPath(path.Join(vdirPath, "/file1"), connID4)
  166. assert.NoError(t, err)
  167. conn4 := NewBaseConnection(connID4, ProtocolSFTP, "", "", user)
  168. fakeConn4 := &fakeConnection{
  169. BaseConnection: conn4,
  170. }
  171. transfer4 := NewBaseTransfer(nil, conn4, nil, filepath.Join(os.TempDir(), folderName, "file1"),
  172. filepath.Join(os.TempDir(), folderName, "file1"), path.Join(vdirPath, "/file1"), TransferUpload, 0, 0,
  173. 100, 0, true, fsFolder, dataprovider.TransferQuota{})
  174. err = Connections.Add(fakeConn4)
  175. assert.NoError(t, err)
  176. connID5 := xid.New().String()
  177. conn5 := NewBaseConnection(connID5, ProtocolSFTP, "", "", user)
  178. fakeConn5 := &fakeConnection{
  179. BaseConnection: conn5,
  180. }
  181. transfer5 := NewBaseTransfer(nil, conn5, nil, filepath.Join(os.TempDir(), folderName, "file2"),
  182. filepath.Join(os.TempDir(), folderName, "file2"), path.Join(vdirPath, "/file2"), TransferUpload, 0, 0,
  183. 100, 0, true, fsFolder, dataprovider.TransferQuota{})
  184. err = Connections.Add(fakeConn5)
  185. assert.NoError(t, err)
  186. transfer4.BytesReceived.Store(50)
  187. transfer5.BytesReceived.Store(40)
  188. Connections.checkTransfers()
  189. assert.Nil(t, transfer4.errAbort)
  190. assert.Nil(t, transfer5.errAbort)
  191. transfer5.BytesReceived.Store(60)
  192. Connections.checkTransfers()
  193. assert.Nil(t, transfer1.errAbort)
  194. assert.Nil(t, transfer2.errAbort)
  195. assert.Nil(t, transfer3.errAbort)
  196. assert.True(t, conn1.IsQuotaExceededError(transfer4.errAbort))
  197. assert.True(t, conn2.IsQuotaExceededError(transfer5.errAbort))
  198. if dataprovider.GetProviderStatus().Driver != dataprovider.MemoryDataProviderName {
  199. providerConf := dataprovider.GetProviderConfig()
  200. err = dataprovider.Close()
  201. assert.NoError(t, err)
  202. transfer4.errAbort = nil
  203. transfer5.errAbort = nil
  204. Connections.checkTransfers()
  205. assert.Nil(t, transfer1.errAbort)
  206. assert.Nil(t, transfer2.errAbort)
  207. assert.Nil(t, transfer3.errAbort)
  208. assert.Nil(t, transfer4.errAbort)
  209. assert.Nil(t, transfer5.errAbort)
  210. err = dataprovider.Initialize(providerConf, configDir, true)
  211. assert.NoError(t, err)
  212. }
  213. err = transfer1.Close()
  214. assert.NoError(t, err)
  215. err = transfer2.Close()
  216. assert.NoError(t, err)
  217. err = transfer3.Close()
  218. assert.NoError(t, err)
  219. err = transfer4.Close()
  220. assert.NoError(t, err)
  221. err = transfer5.Close()
  222. assert.NoError(t, err)
  223. Connections.Remove(fakeConn1.GetID())
  224. Connections.Remove(fakeConn2.GetID())
  225. Connections.Remove(fakeConn3.GetID())
  226. Connections.Remove(fakeConn4.GetID())
  227. Connections.Remove(fakeConn5.GetID())
  228. stats := Connections.GetStats()
  229. assert.Len(t, stats, 0)
  230. err = dataprovider.DeleteUser(user.Username, "", "")
  231. assert.NoError(t, err)
  232. err = os.RemoveAll(user.GetHomeDir())
  233. assert.NoError(t, err)
  234. err = dataprovider.DeleteFolder(folderName, "", "")
  235. assert.NoError(t, err)
  236. err = os.RemoveAll(filepath.Join(os.TempDir(), folderName))
  237. assert.NoError(t, err)
  238. err = dataprovider.DeleteGroup(groupName, "", "")
  239. assert.NoError(t, err)
  240. }
  241. func TestTransferCheckerTransferQuota(t *testing.T) {
  242. username := "transfers_check_username"
  243. user := dataprovider.User{
  244. BaseUser: sdk.BaseUser{
  245. Username: username,
  246. Password: "test_pwd",
  247. HomeDir: filepath.Join(os.TempDir(), username),
  248. Status: 1,
  249. TotalDataTransfer: 1,
  250. Permissions: map[string][]string{
  251. "/": {dataprovider.PermAny},
  252. },
  253. },
  254. }
  255. err := dataprovider.AddUser(&user, "", "")
  256. assert.NoError(t, err)
  257. connID1 := xid.New().String()
  258. fsUser, err := user.GetFilesystemForPath("/file1", connID1)
  259. assert.NoError(t, err)
  260. conn1 := NewBaseConnection(connID1, ProtocolSFTP, "", "192.168.1.1", user)
  261. fakeConn1 := &fakeConnection{
  262. BaseConnection: conn1,
  263. }
  264. transfer1 := NewBaseTransfer(nil, conn1, nil, filepath.Join(user.HomeDir, "file1"), filepath.Join(user.HomeDir, "file1"),
  265. "/file1", TransferUpload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedTotalSize: 100})
  266. transfer1.BytesReceived.Store(150)
  267. err = Connections.Add(fakeConn1)
  268. assert.NoError(t, err)
  269. // the transferschecker will do nothing if there is only one ongoing transfer
  270. Connections.checkTransfers()
  271. assert.Nil(t, transfer1.errAbort)
  272. connID2 := xid.New().String()
  273. conn2 := NewBaseConnection(connID2, ProtocolSFTP, "", "127.0.0.1", user)
  274. fakeConn2 := &fakeConnection{
  275. BaseConnection: conn2,
  276. }
  277. transfer2 := NewBaseTransfer(nil, conn2, nil, filepath.Join(user.HomeDir, "file2"), filepath.Join(user.HomeDir, "file2"),
  278. "/file2", TransferUpload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedTotalSize: 100})
  279. transfer2.BytesReceived.Store(150)
  280. err = Connections.Add(fakeConn2)
  281. assert.NoError(t, err)
  282. Connections.checkTransfers()
  283. assert.Nil(t, transfer1.errAbort)
  284. assert.Nil(t, transfer2.errAbort)
  285. // now test overquota
  286. transfer1.BytesReceived.Store(1024*1024 + 1)
  287. transfer2.BytesReceived.Store(0)
  288. Connections.checkTransfers()
  289. assert.True(t, conn1.IsQuotaExceededError(transfer1.errAbort))
  290. assert.Nil(t, transfer2.errAbort)
  291. transfer1.errAbort = nil
  292. transfer1.BytesReceived.Store(1024*1024 + 1)
  293. transfer2.BytesReceived.Store(1024)
  294. Connections.checkTransfers()
  295. assert.True(t, conn1.IsQuotaExceededError(transfer1.errAbort))
  296. assert.True(t, conn2.IsQuotaExceededError(transfer2.errAbort))
  297. transfer1.BytesReceived.Store(0)
  298. transfer2.BytesReceived.Store(0)
  299. transfer1.errAbort = nil
  300. transfer2.errAbort = nil
  301. err = transfer1.Close()
  302. assert.NoError(t, err)
  303. err = transfer2.Close()
  304. assert.NoError(t, err)
  305. Connections.Remove(fakeConn1.GetID())
  306. Connections.Remove(fakeConn2.GetID())
  307. connID3 := xid.New().String()
  308. conn3 := NewBaseConnection(connID3, ProtocolSFTP, "", "", user)
  309. fakeConn3 := &fakeConnection{
  310. BaseConnection: conn3,
  311. }
  312. transfer3 := NewBaseTransfer(nil, conn3, nil, filepath.Join(user.HomeDir, "file1"), filepath.Join(user.HomeDir, "file1"),
  313. "/file1", TransferDownload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedDLSize: 100})
  314. transfer3.BytesSent.Store(150)
  315. err = Connections.Add(fakeConn3)
  316. assert.NoError(t, err)
  317. connID4 := xid.New().String()
  318. conn4 := NewBaseConnection(connID4, ProtocolSFTP, "", "", user)
  319. fakeConn4 := &fakeConnection{
  320. BaseConnection: conn4,
  321. }
  322. transfer4 := NewBaseTransfer(nil, conn4, nil, filepath.Join(user.HomeDir, "file2"), filepath.Join(user.HomeDir, "file2"),
  323. "/file2", TransferDownload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedDLSize: 100})
  324. transfer4.BytesSent.Store(150)
  325. err = Connections.Add(fakeConn4)
  326. assert.NoError(t, err)
  327. Connections.checkTransfers()
  328. assert.Nil(t, transfer3.errAbort)
  329. assert.Nil(t, transfer4.errAbort)
  330. transfer3.BytesSent.Store(512 * 1024)
  331. transfer4.BytesSent.Store(512*1024 + 1)
  332. Connections.checkTransfers()
  333. if assert.Error(t, transfer3.errAbort) {
  334. assert.Contains(t, transfer3.errAbort.Error(), ErrReadQuotaExceeded.Error())
  335. }
  336. if assert.Error(t, transfer4.errAbort) {
  337. assert.Contains(t, transfer4.errAbort.Error(), ErrReadQuotaExceeded.Error())
  338. }
  339. Connections.Remove(fakeConn3.GetID())
  340. Connections.Remove(fakeConn4.GetID())
  341. stats := Connections.GetStats()
  342. assert.Len(t, stats, 0)
  343. err = dataprovider.DeleteUser(user.Username, "", "")
  344. assert.NoError(t, err)
  345. err = os.RemoveAll(user.GetHomeDir())
  346. assert.NoError(t, err)
  347. }
  348. func TestAggregateTransfers(t *testing.T) {
  349. checker := transfersCheckerMem{}
  350. checker.AddTransfer(dataprovider.ActiveTransfer{
  351. ID: 1,
  352. Type: TransferUpload,
  353. ConnID: "1",
  354. Username: "user",
  355. FolderName: "",
  356. TruncatedSize: 0,
  357. CurrentULSize: 100,
  358. CurrentDLSize: 0,
  359. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  360. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  361. })
  362. usersToFetch, aggregations := checker.aggregateUploadTransfers()
  363. assert.Len(t, usersToFetch, 0)
  364. assert.Len(t, aggregations, 1)
  365. checker.AddTransfer(dataprovider.ActiveTransfer{
  366. ID: 1,
  367. Type: TransferDownload,
  368. ConnID: "2",
  369. Username: "user",
  370. FolderName: "",
  371. TruncatedSize: 0,
  372. CurrentULSize: 0,
  373. CurrentDLSize: 100,
  374. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  375. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  376. })
  377. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  378. assert.Len(t, usersToFetch, 0)
  379. assert.Len(t, aggregations, 1)
  380. checker.AddTransfer(dataprovider.ActiveTransfer{
  381. ID: 1,
  382. Type: TransferUpload,
  383. ConnID: "3",
  384. Username: "user",
  385. FolderName: "folder",
  386. TruncatedSize: 0,
  387. CurrentULSize: 10,
  388. CurrentDLSize: 0,
  389. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  390. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  391. })
  392. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  393. assert.Len(t, usersToFetch, 0)
  394. assert.Len(t, aggregations, 2)
  395. checker.AddTransfer(dataprovider.ActiveTransfer{
  396. ID: 1,
  397. Type: TransferUpload,
  398. ConnID: "4",
  399. Username: "user1",
  400. FolderName: "",
  401. TruncatedSize: 0,
  402. CurrentULSize: 100,
  403. CurrentDLSize: 0,
  404. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  405. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  406. })
  407. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  408. assert.Len(t, usersToFetch, 0)
  409. assert.Len(t, aggregations, 3)
  410. checker.AddTransfer(dataprovider.ActiveTransfer{
  411. ID: 1,
  412. Type: TransferUpload,
  413. ConnID: "5",
  414. Username: "user",
  415. FolderName: "",
  416. TruncatedSize: 0,
  417. CurrentULSize: 100,
  418. CurrentDLSize: 0,
  419. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  420. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  421. })
  422. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  423. assert.Len(t, usersToFetch, 1)
  424. val, ok := usersToFetch["user"]
  425. assert.True(t, ok)
  426. assert.False(t, val)
  427. assert.Len(t, aggregations, 3)
  428. aggregate, ok := aggregations[0]
  429. assert.True(t, ok)
  430. assert.Len(t, aggregate, 2)
  431. checker.AddTransfer(dataprovider.ActiveTransfer{
  432. ID: 1,
  433. Type: TransferUpload,
  434. ConnID: "6",
  435. Username: "user",
  436. FolderName: "",
  437. TruncatedSize: 0,
  438. CurrentULSize: 100,
  439. CurrentDLSize: 0,
  440. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  441. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  442. })
  443. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  444. assert.Len(t, usersToFetch, 1)
  445. val, ok = usersToFetch["user"]
  446. assert.True(t, ok)
  447. assert.False(t, val)
  448. assert.Len(t, aggregations, 3)
  449. aggregate, ok = aggregations[0]
  450. assert.True(t, ok)
  451. assert.Len(t, aggregate, 3)
  452. checker.AddTransfer(dataprovider.ActiveTransfer{
  453. ID: 1,
  454. Type: TransferUpload,
  455. ConnID: "7",
  456. Username: "user",
  457. FolderName: "folder",
  458. TruncatedSize: 0,
  459. CurrentULSize: 10,
  460. CurrentDLSize: 0,
  461. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  462. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  463. })
  464. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  465. assert.Len(t, usersToFetch, 1)
  466. val, ok = usersToFetch["user"]
  467. assert.True(t, ok)
  468. assert.True(t, val)
  469. assert.Len(t, aggregations, 3)
  470. aggregate, ok = aggregations[0]
  471. assert.True(t, ok)
  472. assert.Len(t, aggregate, 3)
  473. aggregate, ok = aggregations[1]
  474. assert.True(t, ok)
  475. assert.Len(t, aggregate, 2)
  476. checker.AddTransfer(dataprovider.ActiveTransfer{
  477. ID: 1,
  478. Type: TransferUpload,
  479. ConnID: "8",
  480. Username: "user",
  481. FolderName: "",
  482. TruncatedSize: 0,
  483. CurrentULSize: 100,
  484. CurrentDLSize: 0,
  485. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  486. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  487. })
  488. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  489. assert.Len(t, usersToFetch, 1)
  490. val, ok = usersToFetch["user"]
  491. assert.True(t, ok)
  492. assert.True(t, val)
  493. assert.Len(t, aggregations, 3)
  494. aggregate, ok = aggregations[0]
  495. assert.True(t, ok)
  496. assert.Len(t, aggregate, 4)
  497. aggregate, ok = aggregations[1]
  498. assert.True(t, ok)
  499. assert.Len(t, aggregate, 2)
  500. }
  501. func TestDataTransferExceeded(t *testing.T) {
  502. user := dataprovider.User{
  503. BaseUser: sdk.BaseUser{
  504. TotalDataTransfer: 1,
  505. },
  506. }
  507. transfer := dataprovider.ActiveTransfer{
  508. CurrentULSize: 0,
  509. CurrentDLSize: 0,
  510. }
  511. user.UsedDownloadDataTransfer = 1024 * 1024
  512. user.UsedUploadDataTransfer = 512 * 1024
  513. checker := transfersCheckerMem{}
  514. res := checker.isDataTransferExceeded(user, transfer, 100, 100)
  515. assert.False(t, res)
  516. transfer.CurrentULSize = 1
  517. res = checker.isDataTransferExceeded(user, transfer, 100, 100)
  518. assert.True(t, res)
  519. user.UsedDownloadDataTransfer = 512*1024 - 100
  520. user.UsedUploadDataTransfer = 512*1024 - 100
  521. res = checker.isDataTransferExceeded(user, transfer, 100, 100)
  522. assert.False(t, res)
  523. res = checker.isDataTransferExceeded(user, transfer, 101, 100)
  524. assert.True(t, res)
  525. user.TotalDataTransfer = 0
  526. user.DownloadDataTransfer = 1
  527. user.UsedDownloadDataTransfer = 512 * 1024
  528. transfer.CurrentULSize = 0
  529. transfer.CurrentDLSize = 100
  530. res = checker.isDataTransferExceeded(user, transfer, 0, 512*1024)
  531. assert.False(t, res)
  532. res = checker.isDataTransferExceeded(user, transfer, 0, 512*1024+1)
  533. assert.True(t, res)
  534. user.DownloadDataTransfer = 0
  535. user.UploadDataTransfer = 1
  536. user.UsedUploadDataTransfer = 512 * 1024
  537. transfer.CurrentULSize = 0
  538. transfer.CurrentDLSize = 0
  539. res = checker.isDataTransferExceeded(user, transfer, 512*1024+1, 0)
  540. assert.False(t, res)
  541. transfer.CurrentULSize = 1
  542. res = checker.isDataTransferExceeded(user, transfer, 512*1024+1, 0)
  543. assert.True(t, res)
  544. }
  545. func TestGetUsersForQuotaCheck(t *testing.T) {
  546. usersToFetch := make(map[string]bool)
  547. for i := 0; i < 50; i++ {
  548. usersToFetch[fmt.Sprintf("user%v", i)] = i%2 == 0
  549. }
  550. users, err := dataprovider.GetUsersForQuotaCheck(usersToFetch)
  551. assert.NoError(t, err)
  552. assert.Len(t, users, 0)
  553. for i := 0; i < 40; i++ {
  554. user := dataprovider.User{
  555. BaseUser: sdk.BaseUser{
  556. Username: fmt.Sprintf("user%v", i),
  557. Password: "pwd",
  558. HomeDir: filepath.Join(os.TempDir(), fmt.Sprintf("user%v", i)),
  559. Status: 1,
  560. QuotaSize: 120,
  561. Permissions: map[string][]string{
  562. "/": {dataprovider.PermAny},
  563. },
  564. },
  565. VirtualFolders: []vfs.VirtualFolder{
  566. {
  567. BaseVirtualFolder: vfs.BaseVirtualFolder{
  568. Name: fmt.Sprintf("f%v", i),
  569. MappedPath: filepath.Join(os.TempDir(), fmt.Sprintf("f%v", i)),
  570. },
  571. VirtualPath: "/vfolder",
  572. QuotaSize: 100,
  573. },
  574. },
  575. Filters: dataprovider.UserFilters{
  576. BaseUserFilters: sdk.BaseUserFilters{
  577. DataTransferLimits: []sdk.DataTransferLimit{
  578. {
  579. Sources: []string{"172.16.0.0/16"},
  580. UploadDataTransfer: 50,
  581. DownloadDataTransfer: 80,
  582. },
  583. },
  584. },
  585. },
  586. }
  587. err = dataprovider.AddUser(&user, "", "")
  588. assert.NoError(t, err)
  589. err = dataprovider.UpdateVirtualFolderQuota(&vfs.BaseVirtualFolder{Name: fmt.Sprintf("f%v", i)}, 1, 50, false)
  590. assert.NoError(t, err)
  591. }
  592. users, err = dataprovider.GetUsersForQuotaCheck(usersToFetch)
  593. assert.NoError(t, err)
  594. assert.Len(t, users, 40)
  595. for _, user := range users {
  596. userIdxStr := strings.Replace(user.Username, "user", "", 1)
  597. userIdx, err := strconv.Atoi(userIdxStr)
  598. assert.NoError(t, err)
  599. if userIdx%2 == 0 {
  600. if assert.Len(t, user.VirtualFolders, 1, user.Username) {
  601. assert.Equal(t, int64(100), user.VirtualFolders[0].QuotaSize)
  602. assert.Equal(t, int64(50), user.VirtualFolders[0].UsedQuotaSize)
  603. }
  604. } else {
  605. switch dataprovider.GetProviderStatus().Driver {
  606. case dataprovider.MySQLDataProviderName, dataprovider.PGSQLDataProviderName,
  607. dataprovider.CockroachDataProviderName, dataprovider.SQLiteDataProviderName:
  608. assert.Len(t, user.VirtualFolders, 0, user.Username)
  609. }
  610. }
  611. ul, dl, total := user.GetDataTransferLimits("127.1.1.1")
  612. assert.Equal(t, int64(0), ul)
  613. assert.Equal(t, int64(0), dl)
  614. assert.Equal(t, int64(0), total)
  615. ul, dl, total = user.GetDataTransferLimits("172.16.2.3")
  616. assert.Equal(t, int64(50*1024*1024), ul)
  617. assert.Equal(t, int64(80*1024*1024), dl)
  618. assert.Equal(t, int64(0), total)
  619. }
  620. for i := 0; i < 40; i++ {
  621. err = dataprovider.DeleteUser(fmt.Sprintf("user%v", i), "", "")
  622. assert.NoError(t, err)
  623. err = dataprovider.DeleteFolder(fmt.Sprintf("f%v", i), "", "")
  624. assert.NoError(t, err)
  625. }
  626. users, err = dataprovider.GetUsersForQuotaCheck(usersToFetch)
  627. assert.NoError(t, err)
  628. assert.Len(t, users, 0)
  629. }
  630. func TestDBTransferChecker(t *testing.T) {
  631. if !isDbTransferCheckerSupported() {
  632. t.Skip("this test is not supported with the current database provider")
  633. }
  634. providerConf := dataprovider.GetProviderConfig()
  635. err := dataprovider.Close()
  636. assert.NoError(t, err)
  637. providerConf.IsShared = 1
  638. err = dataprovider.Initialize(providerConf, configDir, true)
  639. assert.NoError(t, err)
  640. c := getTransfersChecker(1)
  641. checker, ok := c.(*transfersCheckerDB)
  642. assert.True(t, ok)
  643. assert.True(t, checker.lastCleanup.IsZero())
  644. transfer1 := dataprovider.ActiveTransfer{
  645. ID: 1,
  646. Type: TransferDownload,
  647. ConnID: xid.New().String(),
  648. Username: "user1",
  649. FolderName: "folder1",
  650. IP: "127.0.0.1",
  651. }
  652. checker.AddTransfer(transfer1)
  653. transfers, err := dataprovider.GetActiveTransfers(time.Now().Add(24 * time.Hour))
  654. assert.NoError(t, err)
  655. assert.Len(t, transfers, 0)
  656. transfers, err = dataprovider.GetActiveTransfers(time.Now().Add(-periodicTimeoutCheckInterval * 2))
  657. assert.NoError(t, err)
  658. var createdAt, updatedAt int64
  659. if assert.Len(t, transfers, 1) {
  660. transfer := transfers[0]
  661. assert.Equal(t, transfer1.ID, transfer.ID)
  662. assert.Equal(t, transfer1.Type, transfer.Type)
  663. assert.Equal(t, transfer1.ConnID, transfer.ConnID)
  664. assert.Equal(t, transfer1.Username, transfer.Username)
  665. assert.Equal(t, transfer1.IP, transfer.IP)
  666. assert.Equal(t, transfer1.FolderName, transfer.FolderName)
  667. assert.Greater(t, transfer.CreatedAt, int64(0))
  668. assert.Greater(t, transfer.UpdatedAt, int64(0))
  669. assert.Equal(t, int64(0), transfer.CurrentDLSize)
  670. assert.Equal(t, int64(0), transfer.CurrentULSize)
  671. createdAt = transfer.CreatedAt
  672. updatedAt = transfer.UpdatedAt
  673. }
  674. time.Sleep(100 * time.Millisecond)
  675. checker.UpdateTransferCurrentSizes(100, 150, transfer1.ID, transfer1.ConnID)
  676. transfers, err = dataprovider.GetActiveTransfers(time.Now().Add(-periodicTimeoutCheckInterval * 2))
  677. assert.NoError(t, err)
  678. if assert.Len(t, transfers, 1) {
  679. transfer := transfers[0]
  680. assert.Equal(t, int64(150), transfer.CurrentDLSize)
  681. assert.Equal(t, int64(100), transfer.CurrentULSize)
  682. assert.Equal(t, createdAt, transfer.CreatedAt)
  683. assert.Greater(t, transfer.UpdatedAt, updatedAt)
  684. }
  685. res := checker.GetOverquotaTransfers()
  686. assert.Len(t, res, 0)
  687. checker.RemoveTransfer(transfer1.ID, transfer1.ConnID)
  688. transfers, err = dataprovider.GetActiveTransfers(time.Now().Add(-periodicTimeoutCheckInterval * 2))
  689. assert.NoError(t, err)
  690. assert.Len(t, transfers, 0)
  691. err = dataprovider.Close()
  692. assert.NoError(t, err)
  693. res = checker.GetOverquotaTransfers()
  694. assert.Len(t, res, 0)
  695. providerConf.IsShared = 0
  696. err = dataprovider.Initialize(providerConf, configDir, true)
  697. assert.NoError(t, err)
  698. }
  699. func isDbTransferCheckerSupported() bool {
  700. // SQLite shares the implementation with other SQL-based provider but it makes no sense
  701. // to use it outside test cases
  702. switch dataprovider.GetProviderStatus().Driver {
  703. case dataprovider.MySQLDataProviderName, dataprovider.PGSQLDataProviderName,
  704. dataprovider.CockroachDataProviderName, dataprovider.SQLiteDataProviderName:
  705. return true
  706. default:
  707. return false
  708. }
  709. }