| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- #!/usr/bin/env python
- from datetime import datetime
- import sys
- import time
- import pytz
- import requests
- try:
- import urllib.parse as urlparse
- except ImportError:
- import urlparse
- # change base_url to point to your SFTPGo installation
- base_url = "http://127.0.0.1:8080"
- # set to False if you want to skip TLS certificate validation
- verify_tls_cert = True
- # set the credentials for a valid admin here
- admin_user = "admin"
- admin_password = "password"
- # set your update conditions here
- def needQuotaUpdate(user):
- if user["status"] == 0: # inactive user
- return False
- if user["quota_size"] == 0 and user["quota_files"] == 0: # no quota restrictions
- return False
- return True
- class UpdateQuota:
- def __init__(self):
- self.limit = 100
- self.offset = 0
- self.access_token = ""
- self.access_token_expiration = None
- def printLog(self, message):
- print("{} - {}".format(datetime.now(), message))
- def checkAccessToken(self):
- if self.access_token != "" and self.access_token_expiration:
- expire_diff = self.access_token_expiration - datetime.now(tz=pytz.UTC)
- # we don't use total_seconds to be python 2 compatible
- seconds_to_expire = expire_diff.days * 86400 + expire_diff.seconds
- if seconds_to_expire > 180:
- return
- auth = requests.auth.HTTPBasicAuth(admin_user, admin_password)
- r = requests.get(urlparse.urljoin(base_url, "api/v2/token"), auth=auth, verify=verify_tls_cert, timeout=10)
- if r.status_code != 200:
- self.printLog("error getting access token: {}".format(r.text))
- sys.exit(1)
- self.access_token = r.json()["access_token"]
- self.access_token_expiration = pytz.timezone("UTC").localize(datetime.strptime(r.json()["expires_at"],
- "%Y-%m-%dT%H:%M:%SZ"))
- def getAuthHeader(self):
- self.checkAccessToken()
- return {"Authorization": "Bearer " + self.access_token}
- def waitForQuotaUpdate(self, username):
- while True:
- auth_header = self.getAuthHeader()
- r = requests.get(urlparse.urljoin(base_url, "api/v2/quotas/users/scans"), headers=auth_header, verify=verify_tls_cert,
- timeout=10)
- if r.status_code != 200:
- self.printLog("error getting quota scans while waiting for {}: {}".format(username, r.text))
- sys.exit(1)
- scanning = False
- for scan in r.json():
- if scan["username"] == username:
- scanning = True
- if not scanning:
- break
- self.printLog("waiting for the quota scan to complete for user {}".format(username))
- time.sleep(2)
- self.printLog("quota update for user {} finished".format(username))
- def updateUserQuota(self, username):
- self.printLog("starting quota update for user {}".format(username))
- auth_header = self.getAuthHeader()
- r = requests.post(urlparse.urljoin(base_url, "api/v2/quotas/users/" + username + "/scan"), headers=auth_header,
- verify=verify_tls_cert, timeout=10)
- if r.status_code != 202:
- self.printLog("error starting quota scan for user {}: {}".format(username, r.text))
- sys.exit(1)
- self.waitForQuotaUpdate(username)
- def updateUsersQuota(self):
- while True:
- self.printLog("get users, limit {} offset {}".format(self.limit, self.offset))
- auth_header = self.getAuthHeader()
- payload = {"limit":self.limit, "offset":self.offset}
- r = requests.get(urlparse.urljoin(base_url, "api/v2/users"), headers=auth_header, params=payload,
- verify=verify_tls_cert, timeout=10)
- if r.status_code != 200:
- self.printLog("error getting users: {}".format(r.text))
- sys.exit(1)
- users = r.json()
- for user in users:
- if needQuotaUpdate(user):
- self.updateUserQuota(user["username"])
- else:
- self.printLog("user {} does not need a quota update".format(user["username"]))
- self.offset += len(users)
- if len(users) < self.limit:
- break
- if __name__ == '__main__':
- q = UpdateQuota()
- q.updateUsersQuota()
|