| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590 |
- // Package sftpd implements the SSH File Transfer Protocol as described in https://tools.ietf.org/html/draft-ietf-secsh-filexfer-02.
- // It uses pkg/sftp library:
- // https://github.com/pkg/sftp
- package sftpd
- import (
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "net/http"
- "net/url"
- "os"
- "os/exec"
- "path/filepath"
- "strings"
- "sync"
- "time"
- "github.com/drakkan/sftpgo/dataprovider"
- "github.com/drakkan/sftpgo/httpclient"
- "github.com/drakkan/sftpgo/logger"
- "github.com/drakkan/sftpgo/metrics"
- "github.com/drakkan/sftpgo/utils"
- )
- const (
- logSender = "sftpd"
- logSenderSCP = "scp"
- logSenderSSH = "ssh"
- uploadLogSender = "Upload"
- downloadLogSender = "Download"
- renameLogSender = "Rename"
- rmdirLogSender = "Rmdir"
- mkdirLogSender = "Mkdir"
- symlinkLogSender = "Symlink"
- removeLogSender = "Remove"
- chownLogSender = "Chown"
- chmodLogSender = "Chmod"
- chtimesLogSender = "Chtimes"
- sshCommandLogSender = "SSHCommand"
- operationDownload = "download"
- operationUpload = "upload"
- operationDelete = "delete"
- operationPreDelete = "pre-delete"
- operationRename = "rename"
- operationSSHCmd = "ssh_cmd"
- protocolSFTP = "SFTP"
- protocolSCP = "SCP"
- protocolSSH = "SSH"
- handshakeTimeout = 2 * time.Minute
- )
- const (
- uploadModeStandard = iota
- uploadModeAtomic
- uploadModeAtomicWithResume
- )
- var (
- mutex sync.RWMutex
- openConnections map[string]Connection
- activeTransfers []*Transfer
- idleTimeout time.Duration
- activeQuotaScans []ActiveQuotaScan
- activeVFoldersQuotaScan []ActiveVirtualFolderQuotaScan
- dataProvider dataprovider.Provider
- actions Actions
- uploadMode int
- setstatMode int
- supportedSSHCommands = []string{"scp", "md5sum", "sha1sum", "sha256sum", "sha384sum", "sha512sum", "cd", "pwd",
- "git-receive-pack", "git-upload-pack", "git-upload-archive", "rsync"}
- defaultSSHCommands = []string{"md5sum", "sha1sum", "cd", "pwd", "scp"}
- sshHashCommands = []string{"md5sum", "sha1sum", "sha256sum", "sha384sum", "sha512sum"}
- systemCommands = []string{"git-receive-pack", "git-upload-pack", "git-upload-archive", "rsync"}
- errUnconfiguredAction = errors.New("no hook is configured for this action")
- errNoHook = errors.New("unable to execute action, no hook defined")
- errUnexpectedHTTResponse = errors.New("unexpected HTTP response code")
- )
- type connectionTransfer struct {
- OperationType string `json:"operation_type"`
- StartTime int64 `json:"start_time"`
- Size int64 `json:"size"`
- LastActivity int64 `json:"last_activity"`
- Path string `json:"path"`
- }
- // ActiveQuotaScan defines an active quota scan for a user home dir
- type ActiveQuotaScan struct {
- // Username to which the quota scan refers
- Username string `json:"username"`
- // quota scan start time as unix timestamp in milliseconds
- StartTime int64 `json:"start_time"`
- }
- // ActiveVirtualFolderQuotaScan defines an active quota scan for a virtual folder
- type ActiveVirtualFolderQuotaScan struct {
- // folder path to which the quota scan refers
- MappedPath string `json:"mapped_path"`
- // quota scan start time as unix timestamp in milliseconds
- StartTime int64 `json:"start_time"`
- }
- // Actions to execute on SFTP create, download, delete and rename.
- // An external command can be executed and/or an HTTP notification can be fired
- type Actions struct {
- // Valid values are download, upload, delete, rename, ssh_cmd. Empty slice to disable
- ExecuteOn []string `json:"execute_on" mapstructure:"execute_on"`
- // Deprecated: please use Hook
- Command string `json:"command" mapstructure:"command"`
- // Deprecated: please use Hook
- HTTPNotificationURL string `json:"http_notification_url" mapstructure:"http_notification_url"`
- // Absolute path to an external program or an HTTP URL
- Hook string `json:"hook" mapstructure:"hook"`
- }
- // ConnectionStatus status for an active connection
- type ConnectionStatus struct {
- // Logged in username
- Username string `json:"username"`
- // Unique identifier for the connection
- ConnectionID string `json:"connection_id"`
- // client's version string
- ClientVersion string `json:"client_version"`
- // Remote address for this connection
- RemoteAddress string `json:"remote_address"`
- // Connection time as unix timestamp in milliseconds
- ConnectionTime int64 `json:"connection_time"`
- // Last activity as unix timestamp in milliseconds
- LastActivity int64 `json:"last_activity"`
- // Protocol for this connection: SFTP, SCP, SSH
- Protocol string `json:"protocol"`
- // active uploads/downloads
- Transfers []connectionTransfer `json:"active_transfers"`
- // for protocol SSH this is the issued command
- SSHCommand string `json:"ssh_command"`
- }
- type sshSubsystemExitStatus struct {
- Status uint32
- }
- type sshSubsystemExecMsg struct {
- Command string
- }
- type actionNotification struct {
- Action string `json:"action"`
- Username string `json:"username"`
- Path string `json:"path"`
- TargetPath string `json:"target_path,omitempty"`
- SSHCmd string `json:"ssh_cmd,omitempty"`
- FileSize int64 `json:"file_size,omitempty"`
- FsProvider int `json:"fs_provider"`
- Bucket string `json:"bucket,omitempty"`
- Endpoint string `json:"endpoint,omitempty"`
- Status int `json:"status"`
- }
- func newActionNotification(user dataprovider.User, operation, filePath, target, sshCmd string, fileSize int64,
- err error) actionNotification {
- bucket := ""
- endpoint := ""
- status := 1
- if user.FsConfig.Provider == 1 {
- bucket = user.FsConfig.S3Config.Bucket
- endpoint = user.FsConfig.S3Config.Endpoint
- } else if user.FsConfig.Provider == 2 {
- bucket = user.FsConfig.GCSConfig.Bucket
- }
- if err != nil {
- status = 0
- }
- return actionNotification{
- Action: operation,
- Username: user.Username,
- Path: filePath,
- TargetPath: target,
- SSHCmd: sshCmd,
- FileSize: fileSize,
- FsProvider: user.FsConfig.Provider,
- Bucket: bucket,
- Endpoint: endpoint,
- Status: status,
- }
- }
- func (a *actionNotification) AsJSON() []byte {
- res, _ := json.Marshal(a)
- return res
- }
- func (a *actionNotification) AsEnvVars() []string {
- return []string{fmt.Sprintf("SFTPGO_ACTION=%v", a.Action),
- fmt.Sprintf("SFTPGO_ACTION_USERNAME=%v", a.Username),
- fmt.Sprintf("SFTPGO_ACTION_PATH=%v", a.Path),
- fmt.Sprintf("SFTPGO_ACTION_TARGET=%v", a.TargetPath),
- fmt.Sprintf("SFTPGO_ACTION_SSH_CMD=%v", a.SSHCmd),
- fmt.Sprintf("SFTPGO_ACTION_FILE_SIZE=%v", a.FileSize),
- fmt.Sprintf("SFTPGO_ACTION_FS_PROVIDER=%v", a.FsProvider),
- fmt.Sprintf("SFTPGO_ACTION_BUCKET=%v", a.Bucket),
- fmt.Sprintf("SFTPGO_ACTION_ENDPOINT=%v", a.Endpoint),
- fmt.Sprintf("SFTPGO_ACTION_STATUS=%v", a.Status),
- }
- }
- func init() {
- openConnections = make(map[string]Connection)
- }
- // GetDefaultSSHCommands returns the SSH commands enabled as default
- func GetDefaultSSHCommands() []string {
- result := make([]string, len(defaultSSHCommands))
- copy(result, defaultSSHCommands)
- return result
- }
- // GetSupportedSSHCommands returns the supported SSH commands
- func GetSupportedSSHCommands() []string {
- result := make([]string, len(supportedSSHCommands))
- copy(result, supportedSSHCommands)
- return result
- }
- // GetConnectionDuration returns the connection duration as string
- func (c ConnectionStatus) GetConnectionDuration() string {
- elapsed := time.Since(utils.GetTimeFromMsecSinceEpoch(c.ConnectionTime))
- return utils.GetDurationAsString(elapsed)
- }
- // GetConnectionInfo returns connection info.
- // Protocol,Client Version and RemoteAddress are returned.
- // For SSH commands the issued command is returned too.
- func (c ConnectionStatus) GetConnectionInfo() string {
- result := fmt.Sprintf("%v. Client: %#v From: %#v", c.Protocol, c.ClientVersion, c.RemoteAddress)
- if c.Protocol == protocolSSH && len(c.SSHCommand) > 0 {
- result += fmt.Sprintf(". Command: %#v", c.SSHCommand)
- }
- return result
- }
- // GetTransfersAsString returns the active transfers as string
- func (c ConnectionStatus) GetTransfersAsString() string {
- result := ""
- for _, t := range c.Transfers {
- if len(result) > 0 {
- result += ". "
- }
- result += t.getConnectionTransferAsString()
- }
- return result
- }
- func (t connectionTransfer) getConnectionTransferAsString() string {
- result := ""
- if t.OperationType == operationUpload {
- result += "UL"
- } else {
- result += "DL"
- }
- result += fmt.Sprintf(" %#v ", t.Path)
- if t.Size > 0 {
- elapsed := time.Since(utils.GetTimeFromMsecSinceEpoch(t.StartTime))
- speed := float64(t.Size) / float64(utils.GetTimeAsMsSinceEpoch(time.Now())-t.StartTime)
- result += fmt.Sprintf("Size: %#v Elapsed: %#v Speed: \"%.1f KB/s\"", utils.ByteCountSI(t.Size),
- utils.GetDurationAsString(elapsed), speed)
- }
- return result
- }
- // SetDataProvider sets the data provider to use to authenticate users and to get/update their disk quota
- func SetDataProvider(provider dataprovider.Provider) {
- dataProvider = provider
- }
- func getActiveSessions(username string) int {
- mutex.RLock()
- defer mutex.RUnlock()
- numSessions := 0
- for _, c := range openConnections {
- if c.User.Username == username {
- numSessions++
- }
- }
- return numSessions
- }
- // GetQuotaScans returns the active quota scans for users home directories
- func GetQuotaScans() []ActiveQuotaScan {
- mutex.RLock()
- defer mutex.RUnlock()
- scans := make([]ActiveQuotaScan, len(activeQuotaScans))
- copy(scans, activeQuotaScans)
- return scans
- }
- // AddQuotaScan add a user to the ones with active quota scans.
- // Returns false if the user has a quota scan already running
- func AddQuotaScan(username string) bool {
- mutex.Lock()
- defer mutex.Unlock()
- for _, s := range activeQuotaScans {
- if s.Username == username {
- return false
- }
- }
- activeQuotaScans = append(activeQuotaScans, ActiveQuotaScan{
- Username: username,
- StartTime: utils.GetTimeAsMsSinceEpoch(time.Now()),
- })
- return true
- }
- // RemoveQuotaScan removes a user from the ones with active quota scans
- func RemoveQuotaScan(username string) error {
- mutex.Lock()
- defer mutex.Unlock()
- var err error
- indexToRemove := -1
- for i, s := range activeQuotaScans {
- if s.Username == username {
- indexToRemove = i
- break
- }
- }
- if indexToRemove >= 0 {
- activeQuotaScans[indexToRemove] = activeQuotaScans[len(activeQuotaScans)-1]
- activeQuotaScans = activeQuotaScans[:len(activeQuotaScans)-1]
- } else {
- err = fmt.Errorf("quota scan to remove not found for user: %#v", username)
- logger.Warn(logSender, "", "error: %v", err)
- }
- return err
- }
- // GetVFoldersQuotaScans returns the active quota scans for virtual folders
- func GetVFoldersQuotaScans() []ActiveVirtualFolderQuotaScan {
- mutex.RLock()
- defer mutex.RUnlock()
- scans := make([]ActiveVirtualFolderQuotaScan, len(activeVFoldersQuotaScan))
- copy(scans, activeVFoldersQuotaScan)
- return scans
- }
- // AddVFolderQuotaScan add a virtual folder to the ones with active quota scans.
- // Returns false if the folder has a quota scan already running
- func AddVFolderQuotaScan(folderPath string) bool {
- mutex.Lock()
- defer mutex.Unlock()
- for _, s := range activeVFoldersQuotaScan {
- if s.MappedPath == folderPath {
- return false
- }
- }
- activeVFoldersQuotaScan = append(activeVFoldersQuotaScan, ActiveVirtualFolderQuotaScan{
- MappedPath: folderPath,
- StartTime: utils.GetTimeAsMsSinceEpoch(time.Now()),
- })
- return true
- }
- // RemoveVFolderQuotaScan removes a folder from the ones with active quota scans
- func RemoveVFolderQuotaScan(folderPath string) error {
- mutex.Lock()
- defer mutex.Unlock()
- var err error
- indexToRemove := -1
- for i, s := range activeVFoldersQuotaScan {
- if s.MappedPath == folderPath {
- indexToRemove = i
- break
- }
- }
- if indexToRemove >= 0 {
- activeVFoldersQuotaScan[indexToRemove] = activeVFoldersQuotaScan[len(activeVFoldersQuotaScan)-1]
- activeVFoldersQuotaScan = activeVFoldersQuotaScan[:len(activeVFoldersQuotaScan)-1]
- } else {
- err = fmt.Errorf("quota scan to remove not found for user: %#v", folderPath)
- logger.Warn(logSender, "", "error: %v", err)
- }
- return err
- }
- // CloseActiveConnection closes an active SFTP connection.
- // It returns true on success
- func CloseActiveConnection(connectionID string) bool {
- result := false
- mutex.RLock()
- defer mutex.RUnlock()
- if c, ok := openConnections[connectionID]; ok {
- err := c.close()
- c.Log(logger.LevelDebug, logSender, "close connection requested, close err: %v", err)
- result = true
- }
- return result
- }
- // GetConnectionsStats returns stats for active connections
- func GetConnectionsStats() []ConnectionStatus {
- mutex.RLock()
- defer mutex.RUnlock()
- stats := []ConnectionStatus{}
- for _, c := range openConnections {
- conn := ConnectionStatus{
- Username: c.User.Username,
- ConnectionID: c.ID,
- ClientVersion: c.ClientVersion,
- RemoteAddress: c.RemoteAddr.String(),
- ConnectionTime: utils.GetTimeAsMsSinceEpoch(c.StartTime),
- LastActivity: utils.GetTimeAsMsSinceEpoch(c.lastActivity),
- Protocol: c.protocol,
- Transfers: []connectionTransfer{},
- SSHCommand: c.command,
- }
- for _, t := range activeTransfers {
- if t.connectionID == c.ID {
- if t.lastActivity.UnixNano() > c.lastActivity.UnixNano() {
- conn.LastActivity = utils.GetTimeAsMsSinceEpoch(t.lastActivity)
- }
- var operationType string
- var size int64
- if t.transferType == transferUpload {
- operationType = operationUpload
- size = t.bytesReceived
- } else {
- operationType = operationDownload
- size = t.bytesSent
- }
- connTransfer := connectionTransfer{
- OperationType: operationType,
- StartTime: utils.GetTimeAsMsSinceEpoch(t.start),
- Size: size,
- LastActivity: utils.GetTimeAsMsSinceEpoch(t.lastActivity),
- Path: c.fs.GetRelativePath(t.path),
- }
- conn.Transfers = append(conn.Transfers, connTransfer)
- }
- }
- stats = append(stats, conn)
- }
- return stats
- }
- func startIdleTimer(maxIdleTime time.Duration) {
- idleTimeout = maxIdleTime
- go func() {
- for range time.Tick(5 * time.Minute) {
- CheckIdleConnections()
- }
- }()
- }
- // CheckIdleConnections disconnects clients idle for too long, based on IdleTimeout setting
- func CheckIdleConnections() {
- mutex.RLock()
- defer mutex.RUnlock()
- for _, c := range openConnections {
- idleTime := time.Since(c.lastActivity)
- for _, t := range activeTransfers {
- if t.connectionID == c.ID {
- transferIdleTime := time.Since(t.lastActivity)
- if transferIdleTime < idleTime {
- c.Log(logger.LevelDebug, logSender, "idle time: %v setted to transfer idle time: %v",
- idleTime, transferIdleTime)
- idleTime = transferIdleTime
- }
- }
- }
- if idleTime > idleTimeout {
- err := c.close()
- c.Log(logger.LevelInfo, logSender, "close idle connection, idle time: %v, close error: %v", idleTime, err)
- }
- }
- }
- func addConnection(c Connection) {
- mutex.Lock()
- defer mutex.Unlock()
- openConnections[c.ID] = c
- metrics.UpdateActiveConnectionsSize(len(openConnections))
- c.Log(logger.LevelDebug, logSender, "connection added, num open connections: %v", len(openConnections))
- }
- func removeConnection(c Connection) {
- mutex.Lock()
- defer mutex.Unlock()
- delete(openConnections, c.ID)
- metrics.UpdateActiveConnectionsSize(len(openConnections))
- // we have finished to send data here and most of the time the underlying network connection
- // is already closed. Sometime a client can still be reading the last sended data, so we set
- // a deadline instead of directly closing the network connection.
- // Setting a deadline on an already closed connection has no effect.
- // We only need to ensure that a connection will not remain indefinitely open and so the
- // underlying file descriptor is not released.
- // This should protect us against buggy clients and edge cases.
- c.netConn.SetDeadline(time.Now().Add(2 * time.Minute)) //nolint:errcheck
- c.Log(logger.LevelDebug, logSender, "connection removed, num open connections: %v", len(openConnections))
- }
- func addTransfer(transfer *Transfer) {
- mutex.Lock()
- defer mutex.Unlock()
- activeTransfers = append(activeTransfers, transfer)
- }
- func removeTransfer(transfer *Transfer) error {
- mutex.Lock()
- defer mutex.Unlock()
- var err error
- indexToRemove := -1
- for i, v := range activeTransfers {
- if v == transfer {
- indexToRemove = i
- break
- }
- }
- if indexToRemove >= 0 {
- activeTransfers[indexToRemove] = activeTransfers[len(activeTransfers)-1]
- activeTransfers = activeTransfers[:len(activeTransfers)-1]
- } else {
- logger.Warn(logSender, transfer.connectionID, "transfer to remove not found!")
- err = fmt.Errorf("transfer to remove not found")
- }
- return err
- }
- func updateConnectionActivity(id string) {
- mutex.Lock()
- defer mutex.Unlock()
- if c, ok := openConnections[id]; ok {
- c.lastActivity = time.Now()
- openConnections[id] = c
- }
- }
- func isAtomicUploadEnabled() bool {
- return uploadMode == uploadModeAtomic || uploadMode == uploadModeAtomicWithResume
- }
- func executeNotificationCommand(a actionNotification) error {
- if !filepath.IsAbs(actions.Hook) {
- err := fmt.Errorf("invalid notification command %#v", actions.Hook)
- logger.Warn(logSender, "", "unable to execute notification command: %v", err)
- return err
- }
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
- defer cancel()
- cmd := exec.CommandContext(ctx, actions.Hook, a.Action, a.Username, a.Path, a.TargetPath, a.SSHCmd)
- cmd.Env = append(os.Environ(), a.AsEnvVars()...)
- startTime := time.Now()
- err := cmd.Run()
- logger.Debug(logSender, "", "executed command %#v with arguments: %#v, %#v, %#v, %#v, %#v, elapsed: %v, error: %v",
- actions.Hook, a.Action, a.Username, a.Path, a.TargetPath, a.SSHCmd, time.Since(startTime), err)
- return err
- }
- func executeAction(a actionNotification) error {
- if !utils.IsStringInSlice(a.Action, actions.ExecuteOn) {
- return errUnconfiguredAction
- }
- if len(actions.Hook) == 0 {
- logger.Warn(logSender, "", "Unable to send notification, no hook is defined")
- return errNoHook
- }
- if strings.HasPrefix(actions.Hook, "http") {
- var url *url.URL
- url, err := url.Parse(actions.Hook)
- if err != nil {
- logger.Warn(logSender, "", "Invalid hook %#v for operation %#v: %v", actions.Hook, a.Action, err)
- return err
- }
- startTime := time.Now()
- httpClient := httpclient.GetHTTPClient()
- resp, err := httpClient.Post(url.String(), "application/json", bytes.NewBuffer(a.AsJSON()))
- respCode := 0
- if err == nil {
- respCode = resp.StatusCode
- resp.Body.Close()
- if respCode != http.StatusOK {
- err = errUnexpectedHTTResponse
- }
- }
- logger.Debug(logSender, "", "notified operation %#v to URL: %v status code: %v, elapsed: %v err: %v",
- a.Action, url.String(), respCode, time.Since(startTime), err)
- return err
- }
- return executeNotificationCommand(a)
- }
|