utils.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import requests
  2. from requests.exceptions import ConnectionError
  3. import re
  4. from proxypool.proxy import Proxy
  5. def parse_redis_connection_string(connection_string):
  6. """
  7. parse a redis connection string, for example:
  8. redis://[password]@host:port
  9. rediss://[password]@host:port
  10. :param connection_string:
  11. :return:
  12. """
  13. result = re.match('rediss?:\/\/(.*?)@(.*?):(\d+)', connection_string)
  14. return result.group(2), int(result.group(3)), (result.group(1) or None) if result \
  15. else ('localhost', 6379, None)
  16. def is_valid_proxy(data):
  17. """
  18. is data is valid proxy format
  19. :param data:
  20. :return:
  21. """
  22. return re.match('\d+\.\d+\.\d+\.\d+\:\d+', data)
  23. def convert_proxy_or_proxies(data):
  24. """
  25. convert list of str to valid proxies or proxy
  26. :param data:
  27. :return:
  28. """
  29. print(data)
  30. if not data:
  31. return None
  32. if isinstance(data, list):
  33. result = []
  34. for item in data:
  35. # skip invalid item
  36. item = item.strip()
  37. if not is_valid_proxy(item): continue
  38. host, port = item.split(':')
  39. result.append(Proxy(host=host, port=int(port)))
  40. return result
  41. if isinstance(data, str) and is_valid_proxy(data):
  42. host, port = data.split(':')
  43. return Proxy(host=host, port=int(port))