webtask.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. // Copyright (C) 2024 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package httpd
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "sync"
  19. "time"
  20. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  21. "github.com/drakkan/sftpgo/v2/internal/logger"
  22. "github.com/drakkan/sftpgo/v2/internal/util"
  23. )
  24. var (
  25. webTaskMgr webTaskManager
  26. )
  27. func newWebTaskManager(isShared int) webTaskManager {
  28. if isShared == 1 {
  29. logger.Info(logSender, "", "using provider task manager")
  30. return &dbTaskManager{}
  31. }
  32. logger.Info(logSender, "", "using memory task manager")
  33. return &memoryTaskManager{}
  34. }
  35. type webTaskManager interface {
  36. Add(data webTaskData) error
  37. Get(ID string) (webTaskData, error)
  38. Cleanup()
  39. }
  40. type webTaskData struct {
  41. ID string `json:"id"`
  42. User string `json:"user"`
  43. Path string `json:"path"`
  44. Target string `json:"target"`
  45. Timestamp int64 `json:"ts"`
  46. Status int `json:"status"` // 0 in progress or http status code (200 ok, 403 and so on)
  47. }
  48. type memoryTaskManager struct {
  49. tasks sync.Map
  50. }
  51. func (m *memoryTaskManager) Add(data webTaskData) error {
  52. m.tasks.Store(data.ID, &data)
  53. return nil
  54. }
  55. func (m *memoryTaskManager) Get(ID string) (webTaskData, error) {
  56. data, ok := m.tasks.Load(ID)
  57. if !ok {
  58. return webTaskData{}, util.NewRecordNotFoundError(fmt.Sprintf("task for ID %q not found", ID))
  59. }
  60. return *data.(*webTaskData), nil
  61. }
  62. func (m *memoryTaskManager) Cleanup() {
  63. m.tasks.Range(func(key, value any) bool {
  64. data := value.(*webTaskData)
  65. if data.Timestamp < util.GetTimeAsMsSinceEpoch(time.Now().Add(-5*time.Minute)) {
  66. m.tasks.Delete(key)
  67. }
  68. return true
  69. })
  70. }
  71. type dbTaskManager struct{}
  72. func (m *dbTaskManager) Add(data webTaskData) error {
  73. session := dataprovider.Session{
  74. Key: data.ID,
  75. Data: data,
  76. Type: dataprovider.SessionTypeWebTask,
  77. Timestamp: data.Timestamp,
  78. }
  79. return dataprovider.AddSharedSession(session)
  80. }
  81. func (m *dbTaskManager) Get(ID string) (webTaskData, error) {
  82. sess, err := dataprovider.GetSharedSession(ID)
  83. if err != nil {
  84. return webTaskData{}, err
  85. }
  86. d := sess.Data.([]byte)
  87. var data webTaskData
  88. err = json.Unmarshal(d, &data)
  89. return data, err
  90. }
  91. func (m *dbTaskManager) Cleanup() {
  92. dataprovider.CleanupSharedSessions(dataprovider.SessionTypeWebTask, time.Now().Add(-5*time.Minute)) //nolint:errcheck
  93. }