| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 | 
							- #!/usr/bin/env python
 
- from datetime import datetime
 
- import sys
 
- import time
 
- import pytz
 
- import requests
 
- try:
 
- 	import urllib.parse as urlparse
 
- except ImportError:
 
- 	import urlparse
 
- # change base_url to point to your SFTPGo installation
 
- base_url = "http://127.0.0.1:8080"
 
- # set to False if you want to skip TLS certificate validation
 
- verify_tls_cert = True
 
- # set the credentials for a valid admin here
 
- admin_user = "admin"
 
- admin_password = "password"
 
- class CheckRetention:
 
- 	def __init__(self):
 
- 		self.limit = 100
 
- 		self.offset = 0
 
- 		self.access_token = ""
 
- 		self.access_token_expiration = None
 
- 	def printLog(self, message):
 
- 		print("{} - {}".format(datetime.now(), message))
 
- 	def checkAccessToken(self):
 
- 		if self.access_token != "" and self.access_token_expiration:
 
- 			expire_diff = self.access_token_expiration - datetime.now(tz=pytz.UTC)
 
- 			# we don't use total_seconds to be python 2 compatible
 
- 			seconds_to_expire = expire_diff.days * 86400 + expire_diff.seconds
 
- 			if seconds_to_expire > 180:
 
- 				return
 
- 		auth = requests.auth.HTTPBasicAuth(admin_user, admin_password)
 
- 		r = requests.get(urlparse.urljoin(base_url, "api/v2/token"), auth=auth, verify=verify_tls_cert, timeout=10)
 
- 		if r.status_code != 200:
 
- 			self.printLog("error getting access token: {}".format(r.text))
 
- 			sys.exit(1)
 
- 		self.access_token = r.json()["access_token"]
 
- 		self.access_token_expiration = pytz.timezone("UTC").localize(datetime.strptime(r.json()["expires_at"],
 
- 																					"%Y-%m-%dT%H:%M:%SZ"))
 
- 	def getAuthHeader(self):
 
- 		self.checkAccessToken()
 
- 		return {"Authorization": "Bearer " + self.access_token}
 
- 	def waitForRentionCheck(self, username):
 
- 		while True:
 
- 			auth_header = self.getAuthHeader()
 
- 			r = requests.get(urlparse.urljoin(base_url, "api/v2/retention/users/checks"), headers=auth_header, verify=verify_tls_cert,
 
- 							timeout=10)
 
- 			if r.status_code != 200:
 
- 				self.printLog("error getting retention checks while waiting for {}: {}".format(username, r.text))
 
- 				sys.exit(1)
 
- 			checking = False
 
- 			for check in r.json():
 
- 				if check["username"] == username:
 
- 					checking = True
 
- 			if not checking:
 
- 				break
 
- 			self.printLog("waiting for the retention check to complete for user {}".format(username))
 
- 			time.sleep(2)
 
- 		self.printLog("retention check for user {} finished".format(username))
 
- 	def checkUserRetention(self, username):
 
- 		self.printLog("starting retention check for user {}".format(username))
 
- 		auth_header = self.getAuthHeader()
 
- 		retention = [
 
- 						{
 
- 							"path": "/",
 
- 							"retention": 168,
 
- 							"delete_empty_dirs": True,
 
- 							"ignore_user_permissions": False
 
- 						}
 
- 					]
 
- 		r = requests.post(urlparse.urljoin(base_url, "api/v2/retention/users/" + username + "/check"), headers=auth_header,
 
- 						 json=retention, verify=verify_tls_cert, timeout=10)
 
- 		if r.status_code != 202:
 
- 			self.printLog("error starting retention check for user {}: {}".format(username, r.text))
 
- 			sys.exit(1)
 
- 		self.waitForRentionCheck(username)
 
- 	def checkUsersRetention(self):
 
- 		while True:
 
- 			self.printLog("get users, limit {} offset {}".format(self.limit, self.offset))
 
- 			auth_header = self.getAuthHeader()
 
- 			payload = {"limit":self.limit, "offset":self.offset}
 
- 			r = requests.get(urlparse.urljoin(base_url, "api/v2/users"), headers=auth_header, params=payload,
 
- 						verify=verify_tls_cert, timeout=10)
 
- 			if r.status_code != 200:
 
- 				self.printLog("error getting users: {}".format(r.text))
 
- 				sys.exit(1)
 
- 			users = r.json()
 
- 			for user in users:
 
- 				self.checkUserRetention(user["username"])
 
- 			self.offset += len(users)
 
- 			if len(users) < self.limit:
 
- 				break
 
- if __name__ == '__main__':
 
- 	c = CheckRetention()
 
- 	c.checkUsersRetention()
 
 
  |