| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 | 
							- // Copyright (C) 2019 Nicola Murino
 
- //
 
- // This program is free software: you can redistribute it and/or modify
 
- // it under the terms of the GNU Affero General Public License as published
 
- // by the Free Software Foundation, version 3.
 
- //
 
- // This program is distributed in the hope that it will be useful,
 
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
 
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 
- // GNU Affero General Public License for more details.
 
- //
 
- // You should have received a copy of the GNU Affero General Public License
 
- // along with this program. If not, see <https://www.gnu.org/licenses/>.
 
- package common
 
- import (
 
- 	"errors"
 
- 	"sync"
 
- 	"time"
 
- 	"github.com/drakkan/sftpgo/v2/internal/dataprovider"
 
- 	"github.com/drakkan/sftpgo/v2/internal/logger"
 
- 	"github.com/drakkan/sftpgo/v2/internal/util"
 
- )
 
- type overquotaTransfer struct {
 
- 	ConnID       string
 
- 	TransferID   int64
 
- 	TransferType int
 
- }
 
- type uploadAggregationKey struct {
 
- 	Username   string
 
- 	FolderName string
 
- }
 
- // TransfersChecker defines the interface that transfer checkers must implement.
 
- // A transfer checker ensure that multiple concurrent transfers does not exceeded
 
- // the remaining user quota
 
- type TransfersChecker interface {
 
- 	AddTransfer(transfer dataprovider.ActiveTransfer)
 
- 	RemoveTransfer(ID int64, connectionID string)
 
- 	UpdateTransferCurrentSizes(ulSize, dlSize, ID int64, connectionID string)
 
- 	GetOverquotaTransfers() []overquotaTransfer
 
- }
 
- func getTransfersChecker(isShared int) TransfersChecker {
 
- 	if isShared == 1 {
 
- 		logger.Info(logSender, "", "using provider transfer checker")
 
- 		return &transfersCheckerDB{}
 
- 	}
 
- 	logger.Info(logSender, "", "using memory transfer checker")
 
- 	return &transfersCheckerMem{}
 
- }
 
- type baseTransferChecker struct {
 
- 	transfers []dataprovider.ActiveTransfer
 
- }
 
- func (t *baseTransferChecker) isDataTransferExceeded(user dataprovider.User, transfer dataprovider.ActiveTransfer, ulSize,
 
- 	dlSize int64,
 
- ) bool {
 
- 	ulQuota, dlQuota, totalQuota := user.GetDataTransferLimits()
 
- 	if totalQuota > 0 {
 
- 		allowedSize := totalQuota - (user.UsedUploadDataTransfer + user.UsedDownloadDataTransfer)
 
- 		if ulSize+dlSize > allowedSize {
 
- 			return transfer.CurrentDLSize > 0 || transfer.CurrentULSize > 0
 
- 		}
 
- 	}
 
- 	if dlQuota > 0 {
 
- 		allowedSize := dlQuota - user.UsedDownloadDataTransfer
 
- 		if dlSize > allowedSize {
 
- 			return transfer.CurrentDLSize > 0
 
- 		}
 
- 	}
 
- 	if ulQuota > 0 {
 
- 		allowedSize := ulQuota - user.UsedUploadDataTransfer
 
- 		if ulSize > allowedSize {
 
- 			return transfer.CurrentULSize > 0
 
- 		}
 
- 	}
 
- 	return false
 
- }
 
- func (t *baseTransferChecker) getRemainingDiskQuota(user dataprovider.User, folderName string) (int64, error) {
 
- 	var result int64
 
- 	if folderName != "" {
 
- 		for _, folder := range user.VirtualFolders {
 
- 			if folder.Name == folderName {
 
- 				if folder.QuotaSize > 0 {
 
- 					return folder.QuotaSize - folder.UsedQuotaSize, nil
 
- 				}
 
- 			}
 
- 		}
 
- 	} else {
 
- 		if user.QuotaSize > 0 {
 
- 			return user.QuotaSize - user.UsedQuotaSize, nil
 
- 		}
 
- 	}
 
- 	return result, errors.New("no quota limit defined")
 
- }
 
- func (t *baseTransferChecker) aggregateTransfersByUser(usersToFetch map[string]bool,
 
- ) (map[string]bool, map[string][]dataprovider.ActiveTransfer) {
 
- 	aggregations := make(map[string][]dataprovider.ActiveTransfer)
 
- 	for _, transfer := range t.transfers {
 
- 		aggregations[transfer.Username] = append(aggregations[transfer.Username], transfer)
 
- 		if len(aggregations[transfer.Username]) > 1 {
 
- 			if _, ok := usersToFetch[transfer.Username]; !ok {
 
- 				usersToFetch[transfer.Username] = false
 
- 			}
 
- 		}
 
- 	}
 
- 	return usersToFetch, aggregations
 
- }
 
- func (t *baseTransferChecker) aggregateUploadTransfers() (map[string]bool, map[int][]dataprovider.ActiveTransfer) {
 
- 	usersToFetch := make(map[string]bool)
 
- 	aggregations := make(map[int][]dataprovider.ActiveTransfer)
 
- 	var keys []uploadAggregationKey
 
- 	for _, transfer := range t.transfers {
 
- 		if transfer.Type != TransferUpload {
 
- 			continue
 
- 		}
 
- 		key := -1
 
- 		for idx, k := range keys {
 
- 			if k.Username == transfer.Username && k.FolderName == transfer.FolderName {
 
- 				key = idx
 
- 				break
 
- 			}
 
- 		}
 
- 		if key == -1 {
 
- 			key = len(keys)
 
- 		}
 
- 		keys = append(keys, uploadAggregationKey{
 
- 			Username:   transfer.Username,
 
- 			FolderName: transfer.FolderName,
 
- 		})
 
- 		aggregations[key] = append(aggregations[key], transfer)
 
- 		if len(aggregations[key]) > 1 {
 
- 			if transfer.FolderName != "" {
 
- 				usersToFetch[transfer.Username] = true
 
- 			} else {
 
- 				if _, ok := usersToFetch[transfer.Username]; !ok {
 
- 					usersToFetch[transfer.Username] = false
 
- 				}
 
- 			}
 
- 		}
 
- 	}
 
- 	return usersToFetch, aggregations
 
- }
 
- func (t *baseTransferChecker) getUsersToCheck(usersToFetch map[string]bool) (map[string]dataprovider.User, error) {
 
- 	users, err := dataprovider.GetUsersForQuotaCheck(usersToFetch)
 
- 	if err != nil {
 
- 		return nil, err
 
- 	}
 
- 	usersMap := make(map[string]dataprovider.User)
 
- 	for _, user := range users {
 
- 		usersMap[user.Username] = user
 
- 	}
 
- 	return usersMap, nil
 
- }
 
- func (t *baseTransferChecker) getOverquotaTransfers(usersToFetch map[string]bool,
 
- 	uploadAggregations map[int][]dataprovider.ActiveTransfer,
 
- 	userAggregations map[string][]dataprovider.ActiveTransfer,
 
- ) []overquotaTransfer {
 
- 	if len(usersToFetch) == 0 {
 
- 		return nil
 
- 	}
 
- 	usersMap, err := t.getUsersToCheck(usersToFetch)
 
- 	if err != nil {
 
- 		logger.Warn(logSender, "", "unable to check transfers, error getting users quota: %v", err)
 
- 		return nil
 
- 	}
 
- 	var overquotaTransfers []overquotaTransfer
 
- 	for _, transfers := range uploadAggregations {
 
- 		username := transfers[0].Username
 
- 		folderName := transfers[0].FolderName
 
- 		remaningDiskQuota, err := t.getRemainingDiskQuota(usersMap[username], folderName)
 
- 		if err != nil {
 
- 			continue
 
- 		}
 
- 		var usedDiskQuota int64
 
- 		for _, tr := range transfers {
 
- 			// We optimistically assume that a cloud transfer that replaces an existing
 
- 			// file will be successful
 
- 			usedDiskQuota += tr.CurrentULSize - tr.TruncatedSize
 
- 		}
 
- 		logger.Debug(logSender, "", "username %q, folder %q, concurrent transfers: %v, remaining disk quota (bytes): %v, disk quota used in ongoing transfers (bytes): %v",
 
- 			username, folderName, len(transfers), remaningDiskQuota, usedDiskQuota)
 
- 		if usedDiskQuota > remaningDiskQuota {
 
- 			for _, tr := range transfers {
 
- 				if tr.CurrentULSize > tr.TruncatedSize {
 
- 					overquotaTransfers = append(overquotaTransfers, overquotaTransfer{
 
- 						ConnID:       tr.ConnID,
 
- 						TransferID:   tr.ID,
 
- 						TransferType: tr.Type,
 
- 					})
 
- 				}
 
- 			}
 
- 		}
 
- 	}
 
- 	for username, transfers := range userAggregations {
 
- 		var ulSize, dlSize int64
 
- 		for _, tr := range transfers {
 
- 			ulSize += tr.CurrentULSize
 
- 			dlSize += tr.CurrentDLSize
 
- 		}
 
- 		logger.Debug(logSender, "", "username %q, concurrent transfers: %v, quota (bytes) used in ongoing transfers, ul: %v, dl: %v",
 
- 			username, len(transfers), ulSize, dlSize)
 
- 		for _, tr := range transfers {
 
- 			if t.isDataTransferExceeded(usersMap[username], tr, ulSize, dlSize) {
 
- 				overquotaTransfers = append(overquotaTransfers, overquotaTransfer{
 
- 					ConnID:       tr.ConnID,
 
- 					TransferID:   tr.ID,
 
- 					TransferType: tr.Type,
 
- 				})
 
- 			}
 
- 		}
 
- 	}
 
- 	return overquotaTransfers
 
- }
 
- type transfersCheckerMem struct {
 
- 	sync.RWMutex
 
- 	baseTransferChecker
 
- }
 
- func (t *transfersCheckerMem) AddTransfer(transfer dataprovider.ActiveTransfer) {
 
- 	t.Lock()
 
- 	defer t.Unlock()
 
- 	t.transfers = append(t.transfers, transfer)
 
- }
 
- func (t *transfersCheckerMem) RemoveTransfer(ID int64, connectionID string) {
 
- 	t.Lock()
 
- 	defer t.Unlock()
 
- 	for idx, transfer := range t.transfers {
 
- 		if transfer.ID == ID && transfer.ConnID == connectionID {
 
- 			lastIdx := len(t.transfers) - 1
 
- 			t.transfers[idx] = t.transfers[lastIdx]
 
- 			t.transfers = t.transfers[:lastIdx]
 
- 			return
 
- 		}
 
- 	}
 
- }
 
- func (t *transfersCheckerMem) UpdateTransferCurrentSizes(ulSize, dlSize, ID int64, connectionID string) {
 
- 	t.Lock()
 
- 	defer t.Unlock()
 
- 	for idx := range t.transfers {
 
- 		if t.transfers[idx].ID == ID && t.transfers[idx].ConnID == connectionID {
 
- 			t.transfers[idx].CurrentDLSize = dlSize
 
- 			t.transfers[idx].CurrentULSize = ulSize
 
- 			t.transfers[idx].UpdatedAt = util.GetTimeAsMsSinceEpoch(time.Now())
 
- 			return
 
- 		}
 
- 	}
 
- }
 
- func (t *transfersCheckerMem) GetOverquotaTransfers() []overquotaTransfer {
 
- 	t.RLock()
 
- 	usersToFetch, uploadAggregations := t.aggregateUploadTransfers()
 
- 	usersToFetch, userAggregations := t.aggregateTransfersByUser(usersToFetch)
 
- 	t.RUnlock()
 
- 	return t.getOverquotaTransfers(usersToFetch, uploadAggregations, userAggregations)
 
- }
 
- type transfersCheckerDB struct {
 
- 	baseTransferChecker
 
- 	lastCleanup time.Time
 
- }
 
- func (t *transfersCheckerDB) AddTransfer(transfer dataprovider.ActiveTransfer) {
 
- 	dataprovider.AddActiveTransfer(transfer)
 
- }
 
- func (t *transfersCheckerDB) RemoveTransfer(ID int64, connectionID string) {
 
- 	dataprovider.RemoveActiveTransfer(ID, connectionID)
 
- }
 
- func (t *transfersCheckerDB) UpdateTransferCurrentSizes(ulSize, dlSize, ID int64, connectionID string) {
 
- 	dataprovider.UpdateActiveTransferSizes(ulSize, dlSize, ID, connectionID)
 
- }
 
- func (t *transfersCheckerDB) GetOverquotaTransfers() []overquotaTransfer {
 
- 	if t.lastCleanup.IsZero() || t.lastCleanup.Add(periodicTimeoutCheckInterval*15).Before(time.Now()) {
 
- 		before := time.Now().Add(-periodicTimeoutCheckInterval * 5)
 
- 		err := dataprovider.CleanupActiveTransfers(before)
 
- 		logger.Debug(logSender, "", "cleanup active transfers completed, err: %v", err)
 
- 		if err == nil {
 
- 			t.lastCleanup = time.Now()
 
- 		}
 
- 	}
 
- 	var err error
 
- 	from := time.Now().Add(-periodicTimeoutCheckInterval * 2)
 
- 	t.transfers, err = dataprovider.GetActiveTransfers(from)
 
- 	if err != nil {
 
- 		logger.Error(logSender, "", "unable to check overquota transfers, error getting active transfers: %v", err)
 
- 		return nil
 
- 	}
 
- 	usersToFetch, uploadAggregations := t.aggregateUploadTransfers()
 
- 	usersToFetch, userAggregations := t.aggregateTransfersByUser(usersToFetch)
 
- 	return t.getOverquotaTransfers(usersToFetch, uploadAggregations, userAggregations)
 
- }
 
 
  |