transferschecker_test.go 25 KB


  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "fmt"
  17. "os"
  18. "path"
  19. "path/filepath"
  20. "strconv"
  21. "strings"
  22. "sync/atomic"
  23. "testing"
  24. "time"
  25. "github.com/rs/xid"
  26. "github.com/sftpgo/sdk"
  27. "github.com/stretchr/testify/assert"
  28. "github.com/drakkan/sftpgo/v2/dataprovider"
  29. "github.com/drakkan/sftpgo/v2/util"
  30. "github.com/drakkan/sftpgo/v2/vfs"
  31. )
  32. func TestTransfersCheckerDiskQuota(t *testing.T) {
  33. username := "transfers_check_username"
  34. folderName := "test_transfers_folder"
  35. groupName := "test_transfers_group"
  36. vdirPath := "/vdir"
  37. group := dataprovider.Group{
  38. BaseGroup: sdk.BaseGroup{
  39. Name: groupName,
  40. },
  41. UserSettings: dataprovider.GroupUserSettings{
  42. BaseGroupUserSettings: sdk.BaseGroupUserSettings{
  43. QuotaSize: 120,
  44. },
  45. },
  46. }
  47. user := dataprovider.User{
  48. BaseUser: sdk.BaseUser{
  49. Username: username,
  50. Password: "testpwd",
  51. HomeDir: filepath.Join(os.TempDir(), username),
  52. Status: 1,
  53. QuotaSize: 0, // the quota size defined for the group is used
  54. Permissions: map[string][]string{
  55. "/": {dataprovider.PermAny},
  56. },
  57. },
  58. VirtualFolders: []vfs.VirtualFolder{
  59. {
  60. BaseVirtualFolder: vfs.BaseVirtualFolder{
  61. Name: folderName,
  62. MappedPath: filepath.Join(os.TempDir(), folderName),
  63. },
  64. VirtualPath: vdirPath,
  65. QuotaSize: 100,
  66. },
  67. },
  68. Groups: []sdk.GroupMapping{
  69. {
  70. Name: groupName,
  71. Type: sdk.GroupTypePrimary,
  72. },
  73. },
  74. }
  75. err := dataprovider.AddGroup(&group, "", "")
  76. assert.NoError(t, err)
  77. group, err = dataprovider.GroupExists(groupName)
  78. assert.NoError(t, err)
  79. assert.Equal(t, int64(120), group.UserSettings.QuotaSize)
  80. err = dataprovider.AddUser(&user, "", "")
  81. assert.NoError(t, err)
  82. user, err = dataprovider.GetUserWithGroupSettings(username)
  83. assert.NoError(t, err)
  84. connID1 := xid.New().String()
  85. fsUser, err := user.GetFilesystemForPath("/file1", connID1)
  86. assert.NoError(t, err)
  87. conn1 := NewBaseConnection(connID1, ProtocolSFTP, "", "", user)
  88. fakeConn1 := &fakeConnection{
  89. BaseConnection: conn1,
  90. }
  91. transfer1 := NewBaseTransfer(nil, conn1, nil, filepath.Join(user.HomeDir, "file1"), filepath.Join(user.HomeDir, "file1"),
  92. "/file1", TransferUpload, 0, 0, 120, 0, true, fsUser, dataprovider.TransferQuota{})
  93. transfer1.BytesReceived = 150
  94. err = Connections.Add(fakeConn1)
  95. assert.NoError(t, err)
  96. // the transferschecker will do nothing if there is only one ongoing transfer
  97. Connections.checkTransfers()
  98. assert.Nil(t, transfer1.errAbort)
  99. connID2 := xid.New().String()
  100. conn2 := NewBaseConnection(connID2, ProtocolSFTP, "", "", user)
  101. fakeConn2 := &fakeConnection{
  102. BaseConnection: conn2,
  103. }
  104. transfer2 := NewBaseTransfer(nil, conn2, nil, filepath.Join(user.HomeDir, "file2"), filepath.Join(user.HomeDir, "file2"),
  105. "/file2", TransferUpload, 0, 0, 120, 40, true, fsUser, dataprovider.TransferQuota{})
  106. transfer1.BytesReceived = 50
  107. transfer2.BytesReceived = 60
  108. err = Connections.Add(fakeConn2)
  109. assert.NoError(t, err)
  110. connID3 := xid.New().String()
  111. conn3 := NewBaseConnection(connID3, ProtocolSFTP, "", "", user)
  112. fakeConn3 := &fakeConnection{
  113. BaseConnection: conn3,
  114. }
  115. transfer3 := NewBaseTransfer(nil, conn3, nil, filepath.Join(user.HomeDir, "file3"), filepath.Join(user.HomeDir, "file3"),
  116. "/file3", TransferDownload, 0, 0, 120, 0, true, fsUser, dataprovider.TransferQuota{})
  117. transfer3.BytesReceived = 60 // this value will be ignored, this is a download
  118. err = Connections.Add(fakeConn3)
  119. assert.NoError(t, err)
  120. // the transfers are not overquota
  121. Connections.checkTransfers()
  122. assert.Nil(t, transfer1.errAbort)
  123. assert.Nil(t, transfer2.errAbort)
  124. assert.Nil(t, transfer3.errAbort)
  125. transfer1.BytesReceived = 80 // truncated size will be subtracted, we are not overquota
  126. Connections.checkTransfers()
  127. assert.Nil(t, transfer1.errAbort)
  128. assert.Nil(t, transfer2.errAbort)
  129. assert.Nil(t, transfer3.errAbort)
  130. transfer1.BytesReceived = 120
  131. // we are now overquota
  132. // if another check is in progress nothing is done
  133. atomic.StoreInt32(&Connections.transfersCheckStatus, 1)
  134. Connections.checkTransfers()
  135. assert.Nil(t, transfer1.errAbort)
  136. assert.Nil(t, transfer2.errAbort)
  137. assert.Nil(t, transfer3.errAbort)
  138. atomic.StoreInt32(&Connections.transfersCheckStatus, 0)
  139. Connections.checkTransfers()
  140. assert.True(t, conn1.IsQuotaExceededError(transfer1.errAbort), transfer1.errAbort)
  141. assert.True(t, conn2.IsQuotaExceededError(transfer2.errAbort), transfer2.errAbort)
  142. assert.True(t, conn1.IsQuotaExceededError(transfer1.GetAbortError()))
  143. assert.Nil(t, transfer3.errAbort)
  144. assert.True(t, conn3.IsQuotaExceededError(transfer3.GetAbortError()))
  145. // update the user quota size
  146. group.UserSettings.QuotaSize = 1000
  147. err = dataprovider.UpdateGroup(&group, []string{username}, "", "")
  148. assert.NoError(t, err)
  149. transfer1.errAbort = nil
  150. transfer2.errAbort = nil
  151. Connections.checkTransfers()
  152. assert.Nil(t, transfer1.errAbort)
  153. assert.Nil(t, transfer2.errAbort)
  154. assert.Nil(t, transfer3.errAbort)
  155. group.UserSettings.QuotaSize = 0
  156. err = dataprovider.UpdateGroup(&group, []string{username}, "", "")
  157. assert.NoError(t, err)
  158. Connections.checkTransfers()
  159. assert.Nil(t, transfer1.errAbort)
  160. assert.Nil(t, transfer2.errAbort)
  161. assert.Nil(t, transfer3.errAbort)
  162. // now check a public folder
  163. transfer1.BytesReceived = 0
  164. transfer2.BytesReceived = 0
  165. connID4 := xid.New().String()
  166. fsFolder, err := user.GetFilesystemForPath(path.Join(vdirPath, "/file1"), connID4)
  167. assert.NoError(t, err)
  168. conn4 := NewBaseConnection(connID4, ProtocolSFTP, "", "", user)
  169. fakeConn4 := &fakeConnection{
  170. BaseConnection: conn4,
  171. }
  172. transfer4 := NewBaseTransfer(nil, conn4, nil, filepath.Join(os.TempDir(), folderName, "file1"),
  173. filepath.Join(os.TempDir(), folderName, "file1"), path.Join(vdirPath, "/file1"), TransferUpload, 0, 0,
  174. 100, 0, true, fsFolder, dataprovider.TransferQuota{})
  175. err = Connections.Add(fakeConn4)
  176. assert.NoError(t, err)
  177. connID5 := xid.New().String()
  178. conn5 := NewBaseConnection(connID5, ProtocolSFTP, "", "", user)
  179. fakeConn5 := &fakeConnection{
  180. BaseConnection: conn5,
  181. }
  182. transfer5 := NewBaseTransfer(nil, conn5, nil, filepath.Join(os.TempDir(), folderName, "file2"),
  183. filepath.Join(os.TempDir(), folderName, "file2"), path.Join(vdirPath, "/file2"), TransferUpload, 0, 0,
  184. 100, 0, true, fsFolder, dataprovider.TransferQuota{})
  185. err = Connections.Add(fakeConn5)
  186. assert.NoError(t, err)
  187. transfer4.BytesReceived = 50
  188. transfer5.BytesReceived = 40
  189. Connections.checkTransfers()
  190. assert.Nil(t, transfer4.errAbort)
  191. assert.Nil(t, transfer5.errAbort)
  192. transfer5.BytesReceived = 60
  193. Connections.checkTransfers()
  194. assert.Nil(t, transfer1.errAbort)
  195. assert.Nil(t, transfer2.errAbort)
  196. assert.Nil(t, transfer3.errAbort)
  197. assert.True(t, conn1.IsQuotaExceededError(transfer4.errAbort))
  198. assert.True(t, conn2.IsQuotaExceededError(transfer5.errAbort))
  199. if dataprovider.GetProviderStatus().Driver != dataprovider.MemoryDataProviderName {
  200. providerConf := dataprovider.GetProviderConfig()
  201. err = dataprovider.Close()
  202. assert.NoError(t, err)
  203. transfer4.errAbort = nil
  204. transfer5.errAbort = nil
  205. Connections.checkTransfers()
  206. assert.Nil(t, transfer1.errAbort)
  207. assert.Nil(t, transfer2.errAbort)
  208. assert.Nil(t, transfer3.errAbort)
  209. assert.Nil(t, transfer4.errAbort)
  210. assert.Nil(t, transfer5.errAbort)
  211. err = dataprovider.Initialize(providerConf, configDir, true)
  212. assert.NoError(t, err)
  213. }
  214. err = transfer1.Close()
  215. assert.NoError(t, err)
  216. err = transfer2.Close()
  217. assert.NoError(t, err)
  218. err = transfer3.Close()
  219. assert.NoError(t, err)
  220. err = transfer4.Close()
  221. assert.NoError(t, err)
  222. err = transfer5.Close()
  223. assert.NoError(t, err)
  224. Connections.Remove(fakeConn1.GetID())
  225. Connections.Remove(fakeConn2.GetID())
  226. Connections.Remove(fakeConn3.GetID())
  227. Connections.Remove(fakeConn4.GetID())
  228. Connections.Remove(fakeConn5.GetID())
  229. stats := Connections.GetStats()
  230. assert.Len(t, stats, 0)
  231. err = dataprovider.DeleteUser(user.Username, "", "")
  232. assert.NoError(t, err)
  233. err = os.RemoveAll(user.GetHomeDir())
  234. assert.NoError(t, err)
  235. err = dataprovider.DeleteFolder(folderName, "", "")
  236. assert.NoError(t, err)
  237. err = os.RemoveAll(filepath.Join(os.TempDir(), folderName))
  238. assert.NoError(t, err)
  239. err = dataprovider.DeleteGroup(groupName, "", "")
  240. assert.NoError(t, err)
  241. }
  242. func TestTransferCheckerTransferQuota(t *testing.T) {
  243. username := "transfers_check_username"
  244. user := dataprovider.User{
  245. BaseUser: sdk.BaseUser{
  246. Username: username,
  247. Password: "test_pwd",
  248. HomeDir: filepath.Join(os.TempDir(), username),
  249. Status: 1,
  250. TotalDataTransfer: 1,
  251. Permissions: map[string][]string{
  252. "/": {dataprovider.PermAny},
  253. },
  254. },
  255. }
  256. err := dataprovider.AddUser(&user, "", "")
  257. assert.NoError(t, err)
  258. connID1 := xid.New().String()
  259. fsUser, err := user.GetFilesystemForPath("/file1", connID1)
  260. assert.NoError(t, err)
  261. conn1 := NewBaseConnection(connID1, ProtocolSFTP, "", "192.168.1.1", user)
  262. fakeConn1 := &fakeConnection{
  263. BaseConnection: conn1,
  264. }
  265. transfer1 := NewBaseTransfer(nil, conn1, nil, filepath.Join(user.HomeDir, "file1"), filepath.Join(user.HomeDir, "file1"),
  266. "/file1", TransferUpload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedTotalSize: 100})
  267. transfer1.BytesReceived = 150
  268. err = Connections.Add(fakeConn1)
  269. assert.NoError(t, err)
  270. // the transferschecker will do nothing if there is only one ongoing transfer
  271. Connections.checkTransfers()
  272. assert.Nil(t, transfer1.errAbort)
  273. connID2 := xid.New().String()
  274. conn2 := NewBaseConnection(connID2, ProtocolSFTP, "", "127.0.0.1", user)
  275. fakeConn2 := &fakeConnection{
  276. BaseConnection: conn2,
  277. }
  278. transfer2 := NewBaseTransfer(nil, conn2, nil, filepath.Join(user.HomeDir, "file2"), filepath.Join(user.HomeDir, "file2"),
  279. "/file2", TransferUpload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedTotalSize: 100})
  280. transfer2.BytesReceived = 150
  281. err = Connections.Add(fakeConn2)
  282. assert.NoError(t, err)
  283. Connections.checkTransfers()
  284. assert.Nil(t, transfer1.errAbort)
  285. assert.Nil(t, transfer2.errAbort)
  286. // now test overquota
  287. transfer1.BytesReceived = 1024*1024 + 1
  288. transfer2.BytesReceived = 0
  289. Connections.checkTransfers()
  290. assert.True(t, conn1.IsQuotaExceededError(transfer1.errAbort))
  291. assert.Nil(t, transfer2.errAbort)
  292. transfer1.errAbort = nil
  293. transfer1.BytesReceived = 1024*1024 + 1
  294. transfer2.BytesReceived = 1024
  295. Connections.checkTransfers()
  296. assert.True(t, conn1.IsQuotaExceededError(transfer1.errAbort))
  297. assert.True(t, conn2.IsQuotaExceededError(transfer2.errAbort))
  298. transfer1.BytesReceived = 0
  299. transfer2.BytesReceived = 0
  300. transfer1.errAbort = nil
  301. transfer2.errAbort = nil
  302. err = transfer1.Close()
  303. assert.NoError(t, err)
  304. err = transfer2.Close()
  305. assert.NoError(t, err)
  306. Connections.Remove(fakeConn1.GetID())
  307. Connections.Remove(fakeConn2.GetID())
  308. connID3 := xid.New().String()
  309. conn3 := NewBaseConnection(connID3, ProtocolSFTP, "", "", user)
  310. fakeConn3 := &fakeConnection{
  311. BaseConnection: conn3,
  312. }
  313. transfer3 := NewBaseTransfer(nil, conn3, nil, filepath.Join(user.HomeDir, "file1"), filepath.Join(user.HomeDir, "file1"),
  314. "/file1", TransferDownload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedDLSize: 100})
  315. transfer3.BytesSent = 150
  316. err = Connections.Add(fakeConn3)
  317. assert.NoError(t, err)
  318. connID4 := xid.New().String()
  319. conn4 := NewBaseConnection(connID4, ProtocolSFTP, "", "", user)
  320. fakeConn4 := &fakeConnection{
  321. BaseConnection: conn4,
  322. }
  323. transfer4 := NewBaseTransfer(nil, conn4, nil, filepath.Join(user.HomeDir, "file2"), filepath.Join(user.HomeDir, "file2"),
  324. "/file2", TransferDownload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedDLSize: 100})
  325. transfer4.BytesSent = 150
  326. err = Connections.Add(fakeConn4)
  327. assert.NoError(t, err)
  328. Connections.checkTransfers()
  329. assert.Nil(t, transfer3.errAbort)
  330. assert.Nil(t, transfer4.errAbort)
  331. transfer3.BytesSent = 512 * 1024
  332. transfer4.BytesSent = 512*1024 + 1
  333. Connections.checkTransfers()
  334. if assert.Error(t, transfer3.errAbort) {
  335. assert.Contains(t, transfer3.errAbort.Error(), ErrReadQuotaExceeded.Error())
  336. }
  337. if assert.Error(t, transfer4.errAbort) {
  338. assert.Contains(t, transfer4.errAbort.Error(), ErrReadQuotaExceeded.Error())
  339. }
  340. Connections.Remove(fakeConn3.GetID())
  341. Connections.Remove(fakeConn4.GetID())
  342. stats := Connections.GetStats()
  343. assert.Len(t, stats, 0)
  344. err = dataprovider.DeleteUser(user.Username, "", "")
  345. assert.NoError(t, err)
  346. err = os.RemoveAll(user.GetHomeDir())
  347. assert.NoError(t, err)
  348. }
  349. func TestAggregateTransfers(t *testing.T) {
  350. checker := transfersCheckerMem{}
  351. checker.AddTransfer(dataprovider.ActiveTransfer{
  352. ID: 1,
  353. Type: TransferUpload,
  354. ConnID: "1",
  355. Username: "user",
  356. FolderName: "",
  357. TruncatedSize: 0,
  358. CurrentULSize: 100,
  359. CurrentDLSize: 0,
  360. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  361. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  362. })
  363. usersToFetch, aggregations := checker.aggregateUploadTransfers()
  364. assert.Len(t, usersToFetch, 0)
  365. assert.Len(t, aggregations, 1)
  366. checker.AddTransfer(dataprovider.ActiveTransfer{
  367. ID: 1,
  368. Type: TransferDownload,
  369. ConnID: "2",
  370. Username: "user",
  371. FolderName: "",
  372. TruncatedSize: 0,
  373. CurrentULSize: 0,
  374. CurrentDLSize: 100,
  375. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  376. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  377. })
  378. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  379. assert.Len(t, usersToFetch, 0)
  380. assert.Len(t, aggregations, 1)
  381. checker.AddTransfer(dataprovider.ActiveTransfer{
  382. ID: 1,
  383. Type: TransferUpload,
  384. ConnID: "3",
  385. Username: "user",
  386. FolderName: "folder",
  387. TruncatedSize: 0,
  388. CurrentULSize: 10,
  389. CurrentDLSize: 0,
  390. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  391. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  392. })
  393. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  394. assert.Len(t, usersToFetch, 0)
  395. assert.Len(t, aggregations, 2)
  396. checker.AddTransfer(dataprovider.ActiveTransfer{
  397. ID: 1,
  398. Type: TransferUpload,
  399. ConnID: "4",
  400. Username: "user1",
  401. FolderName: "",
  402. TruncatedSize: 0,
  403. CurrentULSize: 100,
  404. CurrentDLSize: 0,
  405. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  406. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  407. })
  408. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  409. assert.Len(t, usersToFetch, 0)
  410. assert.Len(t, aggregations, 3)
  411. checker.AddTransfer(dataprovider.ActiveTransfer{
  412. ID: 1,
  413. Type: TransferUpload,
  414. ConnID: "5",
  415. Username: "user",
  416. FolderName: "",
  417. TruncatedSize: 0,
  418. CurrentULSize: 100,
  419. CurrentDLSize: 0,
  420. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  421. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  422. })
  423. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  424. assert.Len(t, usersToFetch, 1)
  425. val, ok := usersToFetch["user"]
  426. assert.True(t, ok)
  427. assert.False(t, val)
  428. assert.Len(t, aggregations, 3)
  429. aggregate, ok := aggregations[0]
  430. assert.True(t, ok)
  431. assert.Len(t, aggregate, 2)
  432. checker.AddTransfer(dataprovider.ActiveTransfer{
  433. ID: 1,
  434. Type: TransferUpload,
  435. ConnID: "6",
  436. Username: "user",
  437. FolderName: "",
  438. TruncatedSize: 0,
  439. CurrentULSize: 100,
  440. CurrentDLSize: 0,
  441. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  442. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  443. })
  444. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  445. assert.Len(t, usersToFetch, 1)
  446. val, ok = usersToFetch["user"]
  447. assert.True(t, ok)
  448. assert.False(t, val)
  449. assert.Len(t, aggregations, 3)
  450. aggregate, ok = aggregations[0]
  451. assert.True(t, ok)
  452. assert.Len(t, aggregate, 3)
  453. checker.AddTransfer(dataprovider.ActiveTransfer{
  454. ID: 1,
  455. Type: TransferUpload,
  456. ConnID: "7",
  457. Username: "user",
  458. FolderName: "folder",
  459. TruncatedSize: 0,
  460. CurrentULSize: 10,
  461. CurrentDLSize: 0,
  462. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  463. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  464. })
  465. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  466. assert.Len(t, usersToFetch, 1)
  467. val, ok = usersToFetch["user"]
  468. assert.True(t, ok)
  469. assert.True(t, val)
  470. assert.Len(t, aggregations, 3)
  471. aggregate, ok = aggregations[0]
  472. assert.True(t, ok)
  473. assert.Len(t, aggregate, 3)
  474. aggregate, ok = aggregations[1]
  475. assert.True(t, ok)
  476. assert.Len(t, aggregate, 2)
  477. checker.AddTransfer(dataprovider.ActiveTransfer{
  478. ID: 1,
  479. Type: TransferUpload,
  480. ConnID: "8",
  481. Username: "user",
  482. FolderName: "",
  483. TruncatedSize: 0,
  484. CurrentULSize: 100,
  485. CurrentDLSize: 0,
  486. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  487. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  488. })
  489. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  490. assert.Len(t, usersToFetch, 1)
  491. val, ok = usersToFetch["user"]
  492. assert.True(t, ok)
  493. assert.True(t, val)
  494. assert.Len(t, aggregations, 3)
  495. aggregate, ok = aggregations[0]
  496. assert.True(t, ok)
  497. assert.Len(t, aggregate, 4)
  498. aggregate, ok = aggregations[1]
  499. assert.True(t, ok)
  500. assert.Len(t, aggregate, 2)
  501. }
  502. func TestDataTransferExceeded(t *testing.T) {
  503. user := dataprovider.User{
  504. BaseUser: sdk.BaseUser{
  505. TotalDataTransfer: 1,
  506. },
  507. }
  508. transfer := dataprovider.ActiveTransfer{
  509. CurrentULSize: 0,
  510. CurrentDLSize: 0,
  511. }
  512. user.UsedDownloadDataTransfer = 1024 * 1024
  513. user.UsedUploadDataTransfer = 512 * 1024
  514. checker := transfersCheckerMem{}
  515. res := checker.isDataTransferExceeded(user, transfer, 100, 100)
  516. assert.False(t, res)
  517. transfer.CurrentULSize = 1
  518. res = checker.isDataTransferExceeded(user, transfer, 100, 100)
  519. assert.True(t, res)
  520. user.UsedDownloadDataTransfer = 512*1024 - 100
  521. user.UsedUploadDataTransfer = 512*1024 - 100
  522. res = checker.isDataTransferExceeded(user, transfer, 100, 100)
  523. assert.False(t, res)
  524. res = checker.isDataTransferExceeded(user, transfer, 101, 100)
  525. assert.True(t, res)
  526. user.TotalDataTransfer = 0
  527. user.DownloadDataTransfer = 1
  528. user.UsedDownloadDataTransfer = 512 * 1024
  529. transfer.CurrentULSize = 0
  530. transfer.CurrentDLSize = 100
  531. res = checker.isDataTransferExceeded(user, transfer, 0, 512*1024)
  532. assert.False(t, res)
  533. res = checker.isDataTransferExceeded(user, transfer, 0, 512*1024+1)
  534. assert.True(t, res)
  535. user.DownloadDataTransfer = 0
  536. user.UploadDataTransfer = 1
  537. user.UsedUploadDataTransfer = 512 * 1024
  538. transfer.CurrentULSize = 0
  539. transfer.CurrentDLSize = 0
  540. res = checker.isDataTransferExceeded(user, transfer, 512*1024+1, 0)
  541. assert.False(t, res)
  542. transfer.CurrentULSize = 1
  543. res = checker.isDataTransferExceeded(user, transfer, 512*1024+1, 0)
  544. assert.True(t, res)
  545. }
  546. func TestGetUsersForQuotaCheck(t *testing.T) {
  547. usersToFetch := make(map[string]bool)
  548. for i := 0; i < 50; i++ {
  549. usersToFetch[fmt.Sprintf("user%v", i)] = i%2 == 0
  550. }
  551. users, err := dataprovider.GetUsersForQuotaCheck(usersToFetch)
  552. assert.NoError(t, err)
  553. assert.Len(t, users, 0)
  554. for i := 0; i < 40; i++ {
  555. user := dataprovider.User{
  556. BaseUser: sdk.BaseUser{
  557. Username: fmt.Sprintf("user%v", i),
  558. Password: "pwd",
  559. HomeDir: filepath.Join(os.TempDir(), fmt.Sprintf("user%v", i)),
  560. Status: 1,
  561. QuotaSize: 120,
  562. Permissions: map[string][]string{
  563. "/": {dataprovider.PermAny},
  564. },
  565. },
  566. VirtualFolders: []vfs.VirtualFolder{
  567. {
  568. BaseVirtualFolder: vfs.BaseVirtualFolder{
  569. Name: fmt.Sprintf("f%v", i),
  570. MappedPath: filepath.Join(os.TempDir(), fmt.Sprintf("f%v", i)),
  571. },
  572. VirtualPath: "/vfolder",
  573. QuotaSize: 100,
  574. },
  575. },
  576. Filters: dataprovider.UserFilters{
  577. BaseUserFilters: sdk.BaseUserFilters{
  578. DataTransferLimits: []sdk.DataTransferLimit{
  579. {
  580. Sources: []string{"172.16.0.0/16"},
  581. UploadDataTransfer: 50,
  582. DownloadDataTransfer: 80,
  583. },
  584. },
  585. },
  586. },
  587. }
  588. err = dataprovider.AddUser(&user, "", "")
  589. assert.NoError(t, err)
  590. err = dataprovider.UpdateVirtualFolderQuota(&vfs.BaseVirtualFolder{Name: fmt.Sprintf("f%v", i)}, 1, 50, false)
  591. assert.NoError(t, err)
  592. }
  593. users, err = dataprovider.GetUsersForQuotaCheck(usersToFetch)
  594. assert.NoError(t, err)
  595. assert.Len(t, users, 40)
  596. for _, user := range users {
  597. userIdxStr := strings.Replace(user.Username, "user", "", 1)
  598. userIdx, err := strconv.Atoi(userIdxStr)
  599. assert.NoError(t, err)
  600. if userIdx%2 == 0 {
  601. if assert.Len(t, user.VirtualFolders, 1, user.Username) {
  602. assert.Equal(t, int64(100), user.VirtualFolders[0].QuotaSize)
  603. assert.Equal(t, int64(50), user.VirtualFolders[0].UsedQuotaSize)
  604. }
  605. } else {
  606. switch dataprovider.GetProviderStatus().Driver {
  607. case dataprovider.MySQLDataProviderName, dataprovider.PGSQLDataProviderName,
  608. dataprovider.CockroachDataProviderName, dataprovider.SQLiteDataProviderName:
  609. assert.Len(t, user.VirtualFolders, 0, user.Username)
  610. }
  611. }
  612. ul, dl, total := user.GetDataTransferLimits("127.1.1.1")
  613. assert.Equal(t, int64(0), ul)
  614. assert.Equal(t, int64(0), dl)
  615. assert.Equal(t, int64(0), total)
  616. ul, dl, total = user.GetDataTransferLimits("172.16.2.3")
  617. assert.Equal(t, int64(50*1024*1024), ul)
  618. assert.Equal(t, int64(80*1024*1024), dl)
  619. assert.Equal(t, int64(0), total)
  620. }
  621. for i := 0; i < 40; i++ {
  622. err = dataprovider.DeleteUser(fmt.Sprintf("user%v", i), "", "")
  623. assert.NoError(t, err)
  624. err = dataprovider.DeleteFolder(fmt.Sprintf("f%v", i), "", "")
  625. assert.NoError(t, err)
  626. }
  627. users, err = dataprovider.GetUsersForQuotaCheck(usersToFetch)
  628. assert.NoError(t, err)
  629. assert.Len(t, users, 0)
  630. }
  631. func TestDBTransferChecker(t *testing.T) {
  632. if !isDbTransferCheckerSupported() {
  633. t.Skip("this test is not supported with the current database provider")
  634. }
  635. providerConf := dataprovider.GetProviderConfig()
  636. err := dataprovider.Close()
  637. assert.NoError(t, err)
  638. providerConf.IsShared = 1
  639. err = dataprovider.Initialize(providerConf, configDir, true)
  640. assert.NoError(t, err)
  641. c := getTransfersChecker(1)
  642. checker, ok := c.(*transfersCheckerDB)
  643. assert.True(t, ok)
  644. assert.True(t, checker.lastCleanup.IsZero())
  645. transfer1 := dataprovider.ActiveTransfer{
  646. ID: 1,
  647. Type: TransferDownload,
  648. ConnID: xid.New().String(),
  649. Username: "user1",
  650. FolderName: "folder1",
  651. IP: "127.0.0.1",
  652. }
  653. checker.AddTransfer(transfer1)
  654. transfers, err := dataprovider.GetActiveTransfers(time.Now().Add(24 * time.Hour))
  655. assert.NoError(t, err)
  656. assert.Len(t, transfers, 0)
  657. transfers, err = dataprovider.GetActiveTransfers(time.Now().Add(-periodicTimeoutCheckInterval * 2))
  658. assert.NoError(t, err)
  659. var createdAt, updatedAt int64
  660. if assert.Len(t, transfers, 1) {
  661. transfer := transfers[0]
  662. assert.Equal(t, transfer1.ID, transfer.ID)
  663. assert.Equal(t, transfer1.Type, transfer.Type)
  664. assert.Equal(t, transfer1.ConnID, transfer.ConnID)
  665. assert.Equal(t, transfer1.Username, transfer.Username)
  666. assert.Equal(t, transfer1.IP, transfer.IP)
  667. assert.Equal(t, transfer1.FolderName, transfer.FolderName)
  668. assert.Greater(t, transfer.CreatedAt, int64(0))
  669. assert.Greater(t, transfer.UpdatedAt, int64(0))
  670. assert.Equal(t, int64(0), transfer.CurrentDLSize)
  671. assert.Equal(t, int64(0), transfer.CurrentULSize)
  672. createdAt = transfer.CreatedAt
  673. updatedAt = transfer.UpdatedAt
  674. }
  675. time.Sleep(100 * time.Millisecond)
  676. checker.UpdateTransferCurrentSizes(100, 150, transfer1.ID, transfer1.ConnID)
  677. transfers, err = dataprovider.GetActiveTransfers(time.Now().Add(-periodicTimeoutCheckInterval * 2))
  678. assert.NoError(t, err)
  679. if assert.Len(t, transfers, 1) {
  680. transfer := transfers[0]
  681. assert.Equal(t, int64(150), transfer.CurrentDLSize)
  682. assert.Equal(t, int64(100), transfer.CurrentULSize)
  683. assert.Equal(t, createdAt, transfer.CreatedAt)
  684. assert.Greater(t, transfer.UpdatedAt, updatedAt)
  685. }
  686. res := checker.GetOverquotaTransfers()
  687. assert.Len(t, res, 0)
  688. checker.RemoveTransfer(transfer1.ID, transfer1.ConnID)
  689. transfers, err = dataprovider.GetActiveTransfers(time.Now().Add(-periodicTimeoutCheckInterval * 2))
  690. assert.NoError(t, err)
  691. assert.Len(t, transfers, 0)
  692. err = dataprovider.Close()
  693. assert.NoError(t, err)
  694. res = checker.GetOverquotaTransfers()
  695. assert.Len(t, res, 0)
  696. providerConf.IsShared = 0
  697. err = dataprovider.Initialize(providerConf, configDir, true)
  698. assert.NoError(t, err)
  699. }
  700. func isDbTransferCheckerSupported() bool {
  701. // SQLite shares the implementation with other SQL-based provider but it makes no sense
  702. // to use it outside test cases
  703. switch dataprovider.GetProviderStatus().Driver {
  704. case dataprovider.MySQLDataProviderName, dataprovider.PGSQLDataProviderName,
  705. dataprovider.CockroachDataProviderName, dataprovider.SQLiteDataProviderName:
  706. return true
  707. default:
  708. return false
  709. }
  710. }