#!/usr/bin/env python from datetime import datetime import sys import time import pytz import requests try: import urllib.parse as urlparse except ImportError: import urlparse # change base_url to point to your SFTPGo installation base_url = "http://127.0.0.1:8080" # set to False if you want to skip TLS certificate validation verify_tls_cert = True # set the credentials for a valid admin here admin_user = "admin" admin_password = "password" class CheckRetention: def __init__(self): self.limit = 100 self.offset = 0 self.access_token = "" self.access_token_expiration = None def printLog(self, message): print("{} - {}".format(datetime.now(), message)) def checkAccessToken(self): if self.access_token != "" and self.access_token_expiration: expire_diff = self.access_token_expiration - datetime.now(tz=pytz.UTC) # we don't use total_seconds to be python 2 compatible seconds_to_expire = expire_diff.days * 86400 + expire_diff.seconds if seconds_to_expire > 180: return auth = requests.auth.HTTPBasicAuth(admin_user, admin_password) r = requests.get(urlparse.urljoin(base_url, "api/v2/token"), auth=auth, verify=verify_tls_cert, timeout=10) if r.status_code != 200: self.printLog("error getting access token: {}".format(r.text)) sys.exit(1) self.access_token = r.json()["access_token"] self.access_token_expiration = pytz.timezone("UTC").localize(datetime.strptime(r.json()["expires_at"], "%Y-%m-%dT%H:%M:%SZ")) def getAuthHeader(self): self.checkAccessToken() return {"Authorization": "Bearer " + self.access_token} def waitForRentionCheck(self, username): while True: auth_header = self.getAuthHeader() r = requests.get(urlparse.urljoin(base_url, "api/v2/retention/users/checks"), headers=auth_header, verify=verify_tls_cert, timeout=10) if r.status_code != 200: self.printLog("error getting retention checks while waiting for {}: {}".format(username, r.text)) sys.exit(1) checking = False for check in r.json(): if check["username"] == username: checking = True if not checking: break self.printLog("waiting for the retention check to complete for user {}".format(username)) time.sleep(2) self.printLog("retention check for user {} finished".format(username)) def checkUserRetention(self, username): self.printLog("starting retention check for user {}".format(username)) auth_header = self.getAuthHeader() retention = [ { "path": "/", "retention": 168, "delete_empty_dirs": True, "ignore_user_permissions": False } ] r = requests.post(urlparse.urljoin(base_url, "api/v2/retention/users/" + username + "/check"), headers=auth_header, json=retention, verify=verify_tls_cert, timeout=10) if r.status_code != 202: self.printLog("error starting retention check for user {}: {}".format(username, r.text)) sys.exit(1) self.waitForRentionCheck(username) def checkUsersRetention(self): while True: self.printLog("get users, limit {} offset {}".format(self.limit, self.offset)) auth_header = self.getAuthHeader() payload = {"limit":self.limit, "offset":self.offset} r = requests.get(urlparse.urljoin(base_url, "api/v2/users"), headers=auth_header, params=payload, verify=verify_tls_cert, timeout=10) if r.status_code != 200: self.printLog("error getting users: {}".format(r.text)) sys.exit(1) users = r.json() for user in users: self.checkUserRetention(user["username"]) self.offset += len(users) if len(users) < self.limit: break if __name__ == '__main__': c = CheckRetention() c.checkUsersRetention()