checkretention 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. #!/usr/bin/env python
  2. from datetime import datetime
  3. import sys
  4. import time
  5. import pytz
  6. import requests
  7. try:
  8. import urllib.parse as urlparse
  9. except ImportError:
  10. import urlparse
  11. # change base_url to point to your SFTPGo installation
  12. base_url = "http://127.0.0.1:8080"
  13. # set to False if you want to skip TLS certificate validation
  14. verify_tls_cert = True
  15. # set the credentials for a valid admin here
  16. admin_user = "admin"
  17. admin_password = "password"
  18. class CheckRetention:
  19. def __init__(self):
  20. self.limit = 100
  21. self.offset = 0
  22. self.access_token = ""
  23. self.access_token_expiration = None
  24. def printLog(self, message):
  25. print("{} - {}".format(datetime.now(), message))
  26. def checkAccessToken(self):
  27. if self.access_token != "" and self.access_token_expiration:
  28. expire_diff = self.access_token_expiration - datetime.now(tz=pytz.UTC)
  29. # we don't use total_seconds to be python 2 compatible
  30. seconds_to_expire = expire_diff.days * 86400 + expire_diff.seconds
  31. if seconds_to_expire > 180:
  32. return
  33. auth = requests.auth.HTTPBasicAuth(admin_user, admin_password)
  34. r = requests.get(urlparse.urljoin(base_url, "api/v2/token"), auth=auth, verify=verify_tls_cert, timeout=10)
  35. if r.status_code != 200:
  36. self.printLog("error getting access token: {}".format(r.text))
  37. sys.exit(1)
  38. self.access_token = r.json()["access_token"]
  39. self.access_token_expiration = pytz.timezone("UTC").localize(datetime.strptime(r.json()["expires_at"],
  40. "%Y-%m-%dT%H:%M:%SZ"))
  41. def getAuthHeader(self):
  42. self.checkAccessToken()
  43. return {"Authorization": "Bearer " + self.access_token}
  44. def waitForRentionCheck(self, username):
  45. while True:
  46. auth_header = self.getAuthHeader()
  47. r = requests.get(urlparse.urljoin(base_url, "api/v2/retention/users/checks"), headers=auth_header, verify=verify_tls_cert,
  48. timeout=10)
  49. if r.status_code != 200:
  50. self.printLog("error getting retention checks while waiting for {}: {}".format(username, r.text))
  51. sys.exit(1)
  52. checking = False
  53. for check in r.json():
  54. if check["username"] == username:
  55. checking = True
  56. if not checking:
  57. break
  58. self.printLog("waiting for the retention check to complete for user {}".format(username))
  59. time.sleep(2)
  60. self.printLog("retention check for user {} finished".format(username))
  61. def checkUserRetention(self, username):
  62. self.printLog("starting retention check for user {}".format(username))
  63. auth_header = self.getAuthHeader()
  64. retention = [
  65. {
  66. "path": "/",
  67. "retention": 168,
  68. "delete_empty_dirs": True,
  69. "ignore_user_permissions": False
  70. }
  71. ]
  72. r = requests.post(urlparse.urljoin(base_url, "api/v2/retention/users/" + username + "/check"), headers=auth_header,
  73. json=retention, verify=verify_tls_cert, timeout=10)
  74. if r.status_code != 202:
  75. self.printLog("error starting retention check for user {}: {}".format(username, r.text))
  76. sys.exit(1)
  77. self.waitForRentionCheck(username)
  78. def checkUsersRetention(self):
  79. while True:
  80. self.printLog("get users, limit {} offset {}".format(self.limit, self.offset))
  81. auth_header = self.getAuthHeader()
  82. payload = {"limit":self.limit, "offset":self.offset}
  83. r = requests.get(urlparse.urljoin(base_url, "api/v2/users"), headers=auth_header, params=payload,
  84. verify=verify_tls_cert, timeout=10)
  85. if r.status_code != 200:
  86. self.printLog("error getting users: {}".format(r.text))
  87. sys.exit(1)
  88. users = r.json()
  89. for user in users:
  90. self.checkUserRetention(user["username"])
  91. self.offset += len(users)
  92. if len(users) < self.limit:
  93. break
  94. if __name__ == '__main__':
  95. c = CheckRetention()
  96. c.checkUsersRetention()