transferschecker_test.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "fmt"
  17. "os"
  18. "path"
  19. "path/filepath"
  20. "strconv"
  21. "strings"
  22. "testing"
  23. "time"
  24. "github.com/rs/xid"
  25. "github.com/sftpgo/sdk"
  26. "github.com/stretchr/testify/assert"
  27. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  28. "github.com/drakkan/sftpgo/v2/internal/util"
  29. "github.com/drakkan/sftpgo/v2/internal/vfs"
  30. )
  31. func TestTransfersCheckerDiskQuota(t *testing.T) {
  32. username := "transfers_check_username"
  33. folderName := "test_transfers_folder"
  34. groupName := "test_transfers_group"
  35. vdirPath := "/vdir"
  36. group := dataprovider.Group{
  37. BaseGroup: sdk.BaseGroup{
  38. Name: groupName,
  39. },
  40. UserSettings: dataprovider.GroupUserSettings{
  41. BaseGroupUserSettings: sdk.BaseGroupUserSettings{
  42. QuotaSize: 120,
  43. },
  44. },
  45. }
  46. folder := vfs.BaseVirtualFolder{
  47. Name: folderName,
  48. MappedPath: filepath.Join(os.TempDir(), folderName),
  49. }
  50. user := dataprovider.User{
  51. BaseUser: sdk.BaseUser{
  52. Username: username,
  53. Password: "testpwd",
  54. HomeDir: filepath.Join(os.TempDir(), username),
  55. Status: 1,
  56. QuotaSize: 0, // the quota size defined for the group is used
  57. Permissions: map[string][]string{
  58. "/": {dataprovider.PermAny},
  59. },
  60. },
  61. VirtualFolders: []vfs.VirtualFolder{
  62. {
  63. BaseVirtualFolder: vfs.BaseVirtualFolder{
  64. Name: folderName,
  65. },
  66. VirtualPath: vdirPath,
  67. QuotaSize: 100,
  68. },
  69. },
  70. Groups: []sdk.GroupMapping{
  71. {
  72. Name: groupName,
  73. Type: sdk.GroupTypePrimary,
  74. },
  75. },
  76. }
  77. err := dataprovider.AddGroup(&group, "", "", "")
  78. assert.NoError(t, err)
  79. group, err = dataprovider.GroupExists(groupName)
  80. assert.NoError(t, err)
  81. err = dataprovider.AddFolder(&folder, "", "", "")
  82. assert.NoError(t, err)
  83. assert.Equal(t, int64(120), group.UserSettings.QuotaSize)
  84. err = dataprovider.AddUser(&user, "", "", "")
  85. assert.NoError(t, err)
  86. user, err = dataprovider.GetUserWithGroupSettings(username, "")
  87. assert.NoError(t, err)
  88. connID1 := xid.New().String()
  89. fsUser, err := user.GetFilesystemForPath("/file1", connID1)
  90. assert.NoError(t, err)
  91. conn1 := NewBaseConnection(connID1, ProtocolSFTP, "", "", user)
  92. fakeConn1 := &fakeConnection{
  93. BaseConnection: conn1,
  94. }
  95. transfer1 := NewBaseTransfer(nil, conn1, nil, filepath.Join(user.HomeDir, "file1"), filepath.Join(user.HomeDir, "file1"),
  96. "/file1", TransferUpload, 0, 0, 120, 0, true, fsUser, dataprovider.TransferQuota{})
  97. transfer1.BytesReceived.Store(150)
  98. err = Connections.Add(fakeConn1)
  99. assert.NoError(t, err)
  100. // the transferschecker will do nothing if there is only one ongoing transfer
  101. Connections.checkTransfers()
  102. assert.Nil(t, transfer1.errAbort)
  103. connID2 := xid.New().String()
  104. conn2 := NewBaseConnection(connID2, ProtocolSFTP, "", "", user)
  105. fakeConn2 := &fakeConnection{
  106. BaseConnection: conn2,
  107. }
  108. transfer2 := NewBaseTransfer(nil, conn2, nil, filepath.Join(user.HomeDir, "file2"), filepath.Join(user.HomeDir, "file2"),
  109. "/file2", TransferUpload, 0, 0, 120, 40, true, fsUser, dataprovider.TransferQuota{})
  110. transfer1.BytesReceived.Store(50)
  111. transfer2.BytesReceived.Store(60)
  112. err = Connections.Add(fakeConn2)
  113. assert.NoError(t, err)
  114. connID3 := xid.New().String()
  115. conn3 := NewBaseConnection(connID3, ProtocolSFTP, "", "", user)
  116. fakeConn3 := &fakeConnection{
  117. BaseConnection: conn3,
  118. }
  119. transfer3 := NewBaseTransfer(nil, conn3, nil, filepath.Join(user.HomeDir, "file3"), filepath.Join(user.HomeDir, "file3"),
  120. "/file3", TransferDownload, 0, 0, 120, 0, true, fsUser, dataprovider.TransferQuota{})
  121. transfer3.BytesReceived.Store(60) // this value will be ignored, this is a download
  122. err = Connections.Add(fakeConn3)
  123. assert.NoError(t, err)
  124. // the transfers are not overquota
  125. Connections.checkTransfers()
  126. assert.Nil(t, transfer1.errAbort)
  127. assert.Nil(t, transfer2.errAbort)
  128. assert.Nil(t, transfer3.errAbort)
  129. transfer1.BytesReceived.Store(80) // truncated size will be subtracted, we are not overquota
  130. Connections.checkTransfers()
  131. assert.Nil(t, transfer1.errAbort)
  132. assert.Nil(t, transfer2.errAbort)
  133. assert.Nil(t, transfer3.errAbort)
  134. transfer1.BytesReceived.Store(120)
  135. // we are now overquota
  136. // if another check is in progress nothing is done
  137. Connections.transfersCheckStatus.Store(true)
  138. Connections.checkTransfers()
  139. assert.Nil(t, transfer1.errAbort)
  140. assert.Nil(t, transfer2.errAbort)
  141. assert.Nil(t, transfer3.errAbort)
  142. Connections.transfersCheckStatus.Store(false)
  143. Connections.checkTransfers()
  144. assert.True(t, conn1.IsQuotaExceededError(transfer1.errAbort), transfer1.errAbort)
  145. assert.True(t, conn2.IsQuotaExceededError(transfer2.errAbort), transfer2.errAbort)
  146. assert.True(t, conn1.IsQuotaExceededError(transfer1.GetAbortError()))
  147. assert.Nil(t, transfer3.errAbort)
  148. assert.True(t, conn3.IsQuotaExceededError(transfer3.GetAbortError()))
  149. // update the user quota size
  150. group.UserSettings.QuotaSize = 1000
  151. err = dataprovider.UpdateGroup(&group, []string{username}, "", "", "")
  152. assert.NoError(t, err)
  153. transfer1.errAbort = nil
  154. transfer2.errAbort = nil
  155. Connections.checkTransfers()
  156. assert.Nil(t, transfer1.errAbort)
  157. assert.Nil(t, transfer2.errAbort)
  158. assert.Nil(t, transfer3.errAbort)
  159. group.UserSettings.QuotaSize = 0
  160. err = dataprovider.UpdateGroup(&group, []string{username}, "", "", "")
  161. assert.NoError(t, err)
  162. Connections.checkTransfers()
  163. assert.Nil(t, transfer1.errAbort)
  164. assert.Nil(t, transfer2.errAbort)
  165. assert.Nil(t, transfer3.errAbort)
  166. // now check a public folder
  167. transfer1.BytesReceived.Store(0)
  168. transfer2.BytesReceived.Store(0)
  169. connID4 := xid.New().String()
  170. fsFolder, err := user.GetFilesystemForPath(path.Join(vdirPath, "/file1"), connID4)
  171. assert.NoError(t, err)
  172. conn4 := NewBaseConnection(connID4, ProtocolSFTP, "", "", user)
  173. fakeConn4 := &fakeConnection{
  174. BaseConnection: conn4,
  175. }
  176. transfer4 := NewBaseTransfer(nil, conn4, nil, filepath.Join(os.TempDir(), folderName, "file1"),
  177. filepath.Join(os.TempDir(), folderName, "file1"), path.Join(vdirPath, "/file1"), TransferUpload, 0, 0,
  178. 100, 0, true, fsFolder, dataprovider.TransferQuota{})
  179. err = Connections.Add(fakeConn4)
  180. assert.NoError(t, err)
  181. connID5 := xid.New().String()
  182. conn5 := NewBaseConnection(connID5, ProtocolSFTP, "", "", user)
  183. fakeConn5 := &fakeConnection{
  184. BaseConnection: conn5,
  185. }
  186. transfer5 := NewBaseTransfer(nil, conn5, nil, filepath.Join(os.TempDir(), folderName, "file2"),
  187. filepath.Join(os.TempDir(), folderName, "file2"), path.Join(vdirPath, "/file2"), TransferUpload, 0, 0,
  188. 100, 0, true, fsFolder, dataprovider.TransferQuota{})
  189. err = Connections.Add(fakeConn5)
  190. assert.NoError(t, err)
  191. transfer4.BytesReceived.Store(50)
  192. transfer5.BytesReceived.Store(40)
  193. Connections.checkTransfers()
  194. assert.Nil(t, transfer4.errAbort)
  195. assert.Nil(t, transfer5.errAbort)
  196. transfer5.BytesReceived.Store(60)
  197. Connections.checkTransfers()
  198. assert.Nil(t, transfer1.errAbort)
  199. assert.Nil(t, transfer2.errAbort)
  200. assert.Nil(t, transfer3.errAbort)
  201. assert.True(t, conn1.IsQuotaExceededError(transfer4.errAbort))
  202. assert.True(t, conn2.IsQuotaExceededError(transfer5.errAbort))
  203. if dataprovider.GetProviderStatus().Driver != dataprovider.MemoryDataProviderName {
  204. providerConf := dataprovider.GetProviderConfig()
  205. err = dataprovider.Close()
  206. assert.NoError(t, err)
  207. transfer4.errAbort = nil
  208. transfer5.errAbort = nil
  209. Connections.checkTransfers()
  210. assert.Nil(t, transfer1.errAbort)
  211. assert.Nil(t, transfer2.errAbort)
  212. assert.Nil(t, transfer3.errAbort)
  213. assert.Nil(t, transfer4.errAbort)
  214. assert.Nil(t, transfer5.errAbort)
  215. err = dataprovider.Initialize(providerConf, configDir, true)
  216. assert.NoError(t, err)
  217. }
  218. err = transfer1.Close()
  219. assert.NoError(t, err)
  220. err = transfer2.Close()
  221. assert.NoError(t, err)
  222. err = transfer3.Close()
  223. assert.NoError(t, err)
  224. err = transfer4.Close()
  225. assert.NoError(t, err)
  226. err = transfer5.Close()
  227. assert.NoError(t, err)
  228. Connections.Remove(fakeConn1.GetID())
  229. Connections.Remove(fakeConn2.GetID())
  230. Connections.Remove(fakeConn3.GetID())
  231. Connections.Remove(fakeConn4.GetID())
  232. Connections.Remove(fakeConn5.GetID())
  233. stats := Connections.GetStats("")
  234. assert.Len(t, stats, 0)
  235. err = dataprovider.DeleteUser(user.Username, "", "", "")
  236. assert.NoError(t, err)
  237. err = os.RemoveAll(user.GetHomeDir())
  238. assert.NoError(t, err)
  239. err = dataprovider.DeleteFolder(folderName, "", "", "")
  240. assert.NoError(t, err)
  241. err = os.RemoveAll(filepath.Join(os.TempDir(), folderName))
  242. assert.NoError(t, err)
  243. err = dataprovider.DeleteGroup(groupName, "", "", "")
  244. assert.NoError(t, err)
  245. }
  246. func TestTransferCheckerTransferQuota(t *testing.T) {
  247. username := "transfers_check_username"
  248. user := dataprovider.User{
  249. BaseUser: sdk.BaseUser{
  250. Username: username,
  251. Password: "test_pwd",
  252. HomeDir: filepath.Join(os.TempDir(), username),
  253. Status: 1,
  254. TotalDataTransfer: 1,
  255. Permissions: map[string][]string{
  256. "/": {dataprovider.PermAny},
  257. },
  258. },
  259. }
  260. err := dataprovider.AddUser(&user, "", "", "")
  261. assert.NoError(t, err)
  262. connID1 := xid.New().String()
  263. fsUser, err := user.GetFilesystemForPath("/file1", connID1)
  264. assert.NoError(t, err)
  265. conn1 := NewBaseConnection(connID1, ProtocolSFTP, "", "192.168.1.1", user)
  266. fakeConn1 := &fakeConnection{
  267. BaseConnection: conn1,
  268. }
  269. transfer1 := NewBaseTransfer(nil, conn1, nil, filepath.Join(user.HomeDir, "file1"), filepath.Join(user.HomeDir, "file1"),
  270. "/file1", TransferUpload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedTotalSize: 100})
  271. transfer1.BytesReceived.Store(150)
  272. err = Connections.Add(fakeConn1)
  273. assert.NoError(t, err)
  274. // the transferschecker will do nothing if there is only one ongoing transfer
  275. Connections.checkTransfers()
  276. assert.Nil(t, transfer1.errAbort)
  277. connID2 := xid.New().String()
  278. conn2 := NewBaseConnection(connID2, ProtocolSFTP, "", "127.0.0.1", user)
  279. fakeConn2 := &fakeConnection{
  280. BaseConnection: conn2,
  281. }
  282. transfer2 := NewBaseTransfer(nil, conn2, nil, filepath.Join(user.HomeDir, "file2"), filepath.Join(user.HomeDir, "file2"),
  283. "/file2", TransferUpload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedTotalSize: 100})
  284. transfer2.BytesReceived.Store(150)
  285. err = Connections.Add(fakeConn2)
  286. assert.NoError(t, err)
  287. Connections.checkTransfers()
  288. assert.Nil(t, transfer1.errAbort)
  289. assert.Nil(t, transfer2.errAbort)
  290. // now test overquota
  291. transfer1.BytesReceived.Store(1024*1024 + 1)
  292. transfer2.BytesReceived.Store(0)
  293. Connections.checkTransfers()
  294. assert.True(t, conn1.IsQuotaExceededError(transfer1.errAbort), transfer1.errAbort)
  295. assert.Nil(t, transfer2.errAbort)
  296. transfer1.errAbort = nil
  297. transfer1.BytesReceived.Store(1024*1024 + 1)
  298. transfer2.BytesReceived.Store(1024)
  299. Connections.checkTransfers()
  300. assert.True(t, conn1.IsQuotaExceededError(transfer1.errAbort))
  301. assert.True(t, conn2.IsQuotaExceededError(transfer2.errAbort))
  302. transfer1.BytesReceived.Store(0)
  303. transfer2.BytesReceived.Store(0)
  304. transfer1.errAbort = nil
  305. transfer2.errAbort = nil
  306. err = transfer1.Close()
  307. assert.NoError(t, err)
  308. err = transfer2.Close()
  309. assert.NoError(t, err)
  310. Connections.Remove(fakeConn1.GetID())
  311. Connections.Remove(fakeConn2.GetID())
  312. connID3 := xid.New().String()
  313. conn3 := NewBaseConnection(connID3, ProtocolSFTP, "", "", user)
  314. fakeConn3 := &fakeConnection{
  315. BaseConnection: conn3,
  316. }
  317. transfer3 := NewBaseTransfer(nil, conn3, nil, filepath.Join(user.HomeDir, "file1"), filepath.Join(user.HomeDir, "file1"),
  318. "/file1", TransferDownload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedDLSize: 100})
  319. transfer3.BytesSent.Store(150)
  320. err = Connections.Add(fakeConn3)
  321. assert.NoError(t, err)
  322. connID4 := xid.New().String()
  323. conn4 := NewBaseConnection(connID4, ProtocolSFTP, "", "", user)
  324. fakeConn4 := &fakeConnection{
  325. BaseConnection: conn4,
  326. }
  327. transfer4 := NewBaseTransfer(nil, conn4, nil, filepath.Join(user.HomeDir, "file2"), filepath.Join(user.HomeDir, "file2"),
  328. "/file2", TransferDownload, 0, 0, 0, 0, true, fsUser, dataprovider.TransferQuota{AllowedDLSize: 100})
  329. transfer4.BytesSent.Store(150)
  330. err = Connections.Add(fakeConn4)
  331. assert.NoError(t, err)
  332. Connections.checkTransfers()
  333. assert.Nil(t, transfer3.errAbort)
  334. assert.Nil(t, transfer4.errAbort)
  335. transfer3.BytesSent.Store(512 * 1024)
  336. transfer4.BytesSent.Store(512*1024 + 1)
  337. Connections.checkTransfers()
  338. if assert.Error(t, transfer3.errAbort) {
  339. assert.Contains(t, transfer3.errAbort.Error(), ErrReadQuotaExceeded.Error())
  340. }
  341. if assert.Error(t, transfer4.errAbort) {
  342. assert.Contains(t, transfer4.errAbort.Error(), ErrReadQuotaExceeded.Error())
  343. }
  344. Connections.Remove(fakeConn3.GetID())
  345. Connections.Remove(fakeConn4.GetID())
  346. stats := Connections.GetStats("")
  347. assert.Len(t, stats, 0)
  348. err = dataprovider.DeleteUser(user.Username, "", "", "")
  349. assert.NoError(t, err)
  350. err = os.RemoveAll(user.GetHomeDir())
  351. assert.NoError(t, err)
  352. }
  353. func TestAggregateTransfers(t *testing.T) {
  354. checker := transfersCheckerMem{}
  355. checker.AddTransfer(dataprovider.ActiveTransfer{
  356. ID: 1,
  357. Type: TransferUpload,
  358. ConnID: "1",
  359. Username: "user",
  360. FolderName: "",
  361. TruncatedSize: 0,
  362. CurrentULSize: 100,
  363. CurrentDLSize: 0,
  364. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  365. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  366. })
  367. usersToFetch, aggregations := checker.aggregateUploadTransfers()
  368. assert.Len(t, usersToFetch, 0)
  369. assert.Len(t, aggregations, 1)
  370. checker.AddTransfer(dataprovider.ActiveTransfer{
  371. ID: 1,
  372. Type: TransferDownload,
  373. ConnID: "2",
  374. Username: "user",
  375. FolderName: "",
  376. TruncatedSize: 0,
  377. CurrentULSize: 0,
  378. CurrentDLSize: 100,
  379. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  380. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  381. })
  382. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  383. assert.Len(t, usersToFetch, 0)
  384. assert.Len(t, aggregations, 1)
  385. checker.AddTransfer(dataprovider.ActiveTransfer{
  386. ID: 1,
  387. Type: TransferUpload,
  388. ConnID: "3",
  389. Username: "user",
  390. FolderName: "folder",
  391. TruncatedSize: 0,
  392. CurrentULSize: 10,
  393. CurrentDLSize: 0,
  394. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  395. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  396. })
  397. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  398. assert.Len(t, usersToFetch, 0)
  399. assert.Len(t, aggregations, 2)
  400. checker.AddTransfer(dataprovider.ActiveTransfer{
  401. ID: 1,
  402. Type: TransferUpload,
  403. ConnID: "4",
  404. Username: "user1",
  405. FolderName: "",
  406. TruncatedSize: 0,
  407. CurrentULSize: 100,
  408. CurrentDLSize: 0,
  409. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  410. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  411. })
  412. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  413. assert.Len(t, usersToFetch, 0)
  414. assert.Len(t, aggregations, 3)
  415. checker.AddTransfer(dataprovider.ActiveTransfer{
  416. ID: 1,
  417. Type: TransferUpload,
  418. ConnID: "5",
  419. Username: "user",
  420. FolderName: "",
  421. TruncatedSize: 0,
  422. CurrentULSize: 100,
  423. CurrentDLSize: 0,
  424. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  425. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  426. })
  427. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  428. assert.Len(t, usersToFetch, 1)
  429. val, ok := usersToFetch["user"]
  430. assert.True(t, ok)
  431. assert.False(t, val)
  432. assert.Len(t, aggregations, 3)
  433. aggregate, ok := aggregations[0]
  434. assert.True(t, ok)
  435. assert.Len(t, aggregate, 2)
  436. checker.AddTransfer(dataprovider.ActiveTransfer{
  437. ID: 1,
  438. Type: TransferUpload,
  439. ConnID: "6",
  440. Username: "user",
  441. FolderName: "",
  442. TruncatedSize: 0,
  443. CurrentULSize: 100,
  444. CurrentDLSize: 0,
  445. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  446. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  447. })
  448. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  449. assert.Len(t, usersToFetch, 1)
  450. val, ok = usersToFetch["user"]
  451. assert.True(t, ok)
  452. assert.False(t, val)
  453. assert.Len(t, aggregations, 3)
  454. aggregate, ok = aggregations[0]
  455. assert.True(t, ok)
  456. assert.Len(t, aggregate, 3)
  457. checker.AddTransfer(dataprovider.ActiveTransfer{
  458. ID: 1,
  459. Type: TransferUpload,
  460. ConnID: "7",
  461. Username: "user",
  462. FolderName: "folder",
  463. TruncatedSize: 0,
  464. CurrentULSize: 10,
  465. CurrentDLSize: 0,
  466. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  467. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  468. })
  469. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  470. assert.Len(t, usersToFetch, 1)
  471. val, ok = usersToFetch["user"]
  472. assert.True(t, ok)
  473. assert.True(t, val)
  474. assert.Len(t, aggregations, 3)
  475. aggregate, ok = aggregations[0]
  476. assert.True(t, ok)
  477. assert.Len(t, aggregate, 3)
  478. aggregate, ok = aggregations[1]
  479. assert.True(t, ok)
  480. assert.Len(t, aggregate, 2)
  481. checker.AddTransfer(dataprovider.ActiveTransfer{
  482. ID: 1,
  483. Type: TransferUpload,
  484. ConnID: "8",
  485. Username: "user",
  486. FolderName: "",
  487. TruncatedSize: 0,
  488. CurrentULSize: 100,
  489. CurrentDLSize: 0,
  490. CreatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  491. UpdatedAt: util.GetTimeAsMsSinceEpoch(time.Now()),
  492. })
  493. usersToFetch, aggregations = checker.aggregateUploadTransfers()
  494. assert.Len(t, usersToFetch, 1)
  495. val, ok = usersToFetch["user"]
  496. assert.True(t, ok)
  497. assert.True(t, val)
  498. assert.Len(t, aggregations, 3)
  499. aggregate, ok = aggregations[0]
  500. assert.True(t, ok)
  501. assert.Len(t, aggregate, 4)
  502. aggregate, ok = aggregations[1]
  503. assert.True(t, ok)
  504. assert.Len(t, aggregate, 2)
  505. }
  506. func TestDataTransferExceeded(t *testing.T) {
  507. user := dataprovider.User{
  508. BaseUser: sdk.BaseUser{
  509. TotalDataTransfer: 1,
  510. },
  511. }
  512. transfer := dataprovider.ActiveTransfer{
  513. CurrentULSize: 0,
  514. CurrentDLSize: 0,
  515. }
  516. user.UsedDownloadDataTransfer = 1024 * 1024
  517. user.UsedUploadDataTransfer = 512 * 1024
  518. checker := transfersCheckerMem{}
  519. res := checker.isDataTransferExceeded(user, transfer, 100, 100)
  520. assert.False(t, res)
  521. transfer.CurrentULSize = 1
  522. res = checker.isDataTransferExceeded(user, transfer, 100, 100)
  523. assert.True(t, res)
  524. user.UsedDownloadDataTransfer = 512*1024 - 100
  525. user.UsedUploadDataTransfer = 512*1024 - 100
  526. res = checker.isDataTransferExceeded(user, transfer, 100, 100)
  527. assert.False(t, res)
  528. res = checker.isDataTransferExceeded(user, transfer, 101, 100)
  529. assert.True(t, res)
  530. user.TotalDataTransfer = 0
  531. user.DownloadDataTransfer = 1
  532. user.UsedDownloadDataTransfer = 512 * 1024
  533. transfer.CurrentULSize = 0
  534. transfer.CurrentDLSize = 100
  535. res = checker.isDataTransferExceeded(user, transfer, 0, 512*1024)
  536. assert.False(t, res)
  537. res = checker.isDataTransferExceeded(user, transfer, 0, 512*1024+1)
  538. assert.True(t, res)
  539. user.DownloadDataTransfer = 0
  540. user.UploadDataTransfer = 1
  541. user.UsedUploadDataTransfer = 512 * 1024
  542. transfer.CurrentULSize = 0
  543. transfer.CurrentDLSize = 0
  544. res = checker.isDataTransferExceeded(user, transfer, 512*1024+1, 0)
  545. assert.False(t, res)
  546. transfer.CurrentULSize = 1
  547. res = checker.isDataTransferExceeded(user, transfer, 512*1024+1, 0)
  548. assert.True(t, res)
  549. }
  550. func TestGetUsersForQuotaCheck(t *testing.T) {
  551. usersToFetch := make(map[string]bool)
  552. for i := 0; i < 70; i++ {
  553. usersToFetch[fmt.Sprintf("user%v", i)] = i%2 == 0
  554. }
  555. users, err := dataprovider.GetUsersForQuotaCheck(usersToFetch)
  556. assert.NoError(t, err)
  557. assert.Len(t, users, 0)
  558. for i := 0; i < 60; i++ {
  559. folder := vfs.BaseVirtualFolder{
  560. Name: fmt.Sprintf("f%v", i),
  561. MappedPath: filepath.Join(os.TempDir(), fmt.Sprintf("f%v", i)),
  562. }
  563. user := dataprovider.User{
  564. BaseUser: sdk.BaseUser{
  565. Username: fmt.Sprintf("user%v", i),
  566. Password: "pwd",
  567. HomeDir: filepath.Join(os.TempDir(), fmt.Sprintf("user%v", i)),
  568. Status: 1,
  569. QuotaSize: 120,
  570. Permissions: map[string][]string{
  571. "/": {dataprovider.PermAny},
  572. },
  573. },
  574. VirtualFolders: []vfs.VirtualFolder{
  575. {
  576. BaseVirtualFolder: vfs.BaseVirtualFolder{
  577. Name: folder.Name,
  578. },
  579. VirtualPath: "/vfolder",
  580. QuotaSize: 100,
  581. },
  582. },
  583. }
  584. err = dataprovider.AddFolder(&folder, "", "", "")
  585. assert.NoError(t, err)
  586. err = dataprovider.AddUser(&user, "", "", "")
  587. assert.NoError(t, err)
  588. err = dataprovider.UpdateVirtualFolderQuota(&vfs.BaseVirtualFolder{Name: fmt.Sprintf("f%v", i)}, 1, 50, false)
  589. assert.NoError(t, err)
  590. }
  591. users, err = dataprovider.GetUsersForQuotaCheck(usersToFetch)
  592. assert.NoError(t, err)
  593. assert.Len(t, users, 60)
  594. for _, user := range users {
  595. userIdxStr := strings.Replace(user.Username, "user", "", 1)
  596. userIdx, err := strconv.Atoi(userIdxStr)
  597. assert.NoError(t, err)
  598. if userIdx%2 == 0 {
  599. if assert.Len(t, user.VirtualFolders, 1, user.Username) {
  600. assert.Equal(t, int64(100), user.VirtualFolders[0].QuotaSize)
  601. assert.Equal(t, int64(50), user.VirtualFolders[0].UsedQuotaSize)
  602. }
  603. } else {
  604. switch dataprovider.GetProviderStatus().Driver {
  605. case dataprovider.MySQLDataProviderName, dataprovider.PGSQLDataProviderName,
  606. dataprovider.CockroachDataProviderName, dataprovider.SQLiteDataProviderName:
  607. assert.Len(t, user.VirtualFolders, 0, user.Username)
  608. }
  609. }
  610. ul, dl, total := user.GetDataTransferLimits()
  611. assert.Equal(t, int64(0), ul)
  612. assert.Equal(t, int64(0), dl)
  613. assert.Equal(t, int64(0), total)
  614. }
  615. for i := 0; i < 60; i++ {
  616. err = dataprovider.DeleteUser(fmt.Sprintf("user%v", i), "", "", "")
  617. assert.NoError(t, err)
  618. err = dataprovider.DeleteFolder(fmt.Sprintf("f%v", i), "", "", "")
  619. assert.NoError(t, err)
  620. }
  621. users, err = dataprovider.GetUsersForQuotaCheck(usersToFetch)
  622. assert.NoError(t, err)
  623. assert.Len(t, users, 0)
  624. }
  625. func TestDBTransferChecker(t *testing.T) {
  626. if !isDbTransferCheckerSupported() {
  627. t.Skip("this test is not supported with the current database provider")
  628. }
  629. providerConf := dataprovider.GetProviderConfig()
  630. err := dataprovider.Close()
  631. assert.NoError(t, err)
  632. providerConf.IsShared = 1
  633. err = dataprovider.Initialize(providerConf, configDir, true)
  634. assert.NoError(t, err)
  635. c := getTransfersChecker(1)
  636. checker, ok := c.(*transfersCheckerDB)
  637. assert.True(t, ok)
  638. assert.True(t, checker.lastCleanup.IsZero())
  639. transfer1 := dataprovider.ActiveTransfer{
  640. ID: 1,
  641. Type: TransferDownload,
  642. ConnID: xid.New().String(),
  643. Username: "user1",
  644. FolderName: "folder1",
  645. IP: "127.0.0.1",
  646. }
  647. checker.AddTransfer(transfer1)
  648. transfers, err := dataprovider.GetActiveTransfers(time.Now().Add(24 * time.Hour))
  649. assert.NoError(t, err)
  650. assert.Len(t, transfers, 0)
  651. transfers, err = dataprovider.GetActiveTransfers(time.Now().Add(-periodicTimeoutCheckInterval * 2))
  652. assert.NoError(t, err)
  653. var createdAt, updatedAt int64
  654. if assert.Len(t, transfers, 1) {
  655. transfer := transfers[0]
  656. assert.Equal(t, transfer1.ID, transfer.ID)
  657. assert.Equal(t, transfer1.Type, transfer.Type)
  658. assert.Equal(t, transfer1.ConnID, transfer.ConnID)
  659. assert.Equal(t, transfer1.Username, transfer.Username)
  660. assert.Equal(t, transfer1.IP, transfer.IP)
  661. assert.Equal(t, transfer1.FolderName, transfer.FolderName)
  662. assert.Greater(t, transfer.CreatedAt, int64(0))
  663. assert.Greater(t, transfer.UpdatedAt, int64(0))
  664. assert.Equal(t, int64(0), transfer.CurrentDLSize)
  665. assert.Equal(t, int64(0), transfer.CurrentULSize)
  666. createdAt = transfer.CreatedAt
  667. updatedAt = transfer.UpdatedAt
  668. }
  669. time.Sleep(100 * time.Millisecond)
  670. checker.UpdateTransferCurrentSizes(100, 150, transfer1.ID, transfer1.ConnID)
  671. transfers, err = dataprovider.GetActiveTransfers(time.Now().Add(-periodicTimeoutCheckInterval * 2))
  672. assert.NoError(t, err)
  673. if assert.Len(t, transfers, 1) {
  674. transfer := transfers[0]
  675. assert.Equal(t, int64(150), transfer.CurrentDLSize)
  676. assert.Equal(t, int64(100), transfer.CurrentULSize)
  677. assert.Equal(t, createdAt, transfer.CreatedAt)
  678. assert.Greater(t, transfer.UpdatedAt, updatedAt)
  679. }
  680. res := checker.GetOverquotaTransfers()
  681. assert.Len(t, res, 0)
  682. checker.RemoveTransfer(transfer1.ID, transfer1.ConnID)
  683. transfers, err = dataprovider.GetActiveTransfers(time.Now().Add(-periodicTimeoutCheckInterval * 2))
  684. assert.NoError(t, err)
  685. assert.Len(t, transfers, 0)
  686. err = dataprovider.Close()
  687. assert.NoError(t, err)
  688. res = checker.GetOverquotaTransfers()
  689. assert.Len(t, res, 0)
  690. providerConf.IsShared = 0
  691. err = dataprovider.Initialize(providerConf, configDir, true)
  692. assert.NoError(t, err)
  693. }
  694. func isDbTransferCheckerSupported() bool {
  695. // SQLite shares the implementation with other SQL-based provider but it makes no sense
  696. // to use it outside test cases
  697. switch dataprovider.GetProviderStatus().Driver {
  698. case dataprovider.MySQLDataProviderName, dataprovider.PGSQLDataProviderName,
  699. dataprovider.CockroachDataProviderName, dataprovider.SQLiteDataProviderName:
  700. return true
  701. default:
  702. return false
  703. }
  704. }