http.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. # coding=utf-8
  2. """
  3. HTTP请求工具模块
  4. HTTP utilities module for DDNS project.
  5. Provides common HTTP functionality including ssl, proxy, and basicauth.
  6. @author: NewFuture
  7. """
  8. from logging import getLogger
  9. from re import compile
  10. import ssl
  11. import os
  12. try: # python 3
  13. from urllib.request import ( # noqa: F401
  14. HTTPBasicAuthHandler,
  15. HTTPSHandler,
  16. BaseHandler,
  17. Request,
  18. HTTPPasswordMgrWithDefaultRealm,
  19. HTTPDefaultErrorHandler,
  20. ProxyHandler,
  21. build_opener,
  22. )
  23. from urllib.parse import quote, urlencode, unquote
  24. except ImportError: # python 2
  25. from urllib2 import ( # type: ignore[no-redef]
  26. Request,
  27. HTTPSHandler,
  28. ProxyHandler,
  29. HTTPDefaultErrorHandler,
  30. HTTPPasswordMgrWithDefaultRealm,
  31. HTTPBasicAuthHandler,
  32. build_opener,
  33. )
  34. from urllib import urlencode, quote, unquote # type: ignore[no-redef]
  35. __all__ = ["send_http_request", "HttpResponse", "quote", "urlencode"]
  36. logger = getLogger().getChild(__name__)
  37. _AUTH_URL_RE = compile(r"^(https?://)([^:/?#]+):([^@]+)@(.+)$")
  38. class NoHTTPErrorProcessor(HTTPDefaultErrorHandler): # type: ignore[misc]
  39. """自定义HTTP错误处理器,处理所有HTTP错误状态码,返回响应而不抛出异常"""
  40. def http_error_default(self, req, fp, code, msg, hdrs):
  41. """处理所有HTTP错误状态码,返回响应而不抛出异常"""
  42. logger.warning("HTTP error %s: %s", code, msg)
  43. return fp
  44. class SSLAutoFallbackHandler(HTTPSHandler): # type: ignore[misc]
  45. """SSL自动降级处理器,处理 unable to get local issuer certificate 错误"""
  46. # 类级别的SSL上下文缓存
  47. _ssl_cache = {} # type: dict[str, ssl.SSLContext]
  48. def __init__(self, verify_ssl):
  49. # type: (bool | str) -> None
  50. self._verify_ssl = verify_ssl
  51. ssl_context = self._get_ssl_context()
  52. # 兼容性:优先使用context参数,失败时降级
  53. try: # python 3 / python 2.7.9+
  54. HTTPSHandler.__init__(self, context=ssl_context)
  55. except (TypeError, AttributeError): # python 2.7.8-
  56. HTTPSHandler.__init__(self)
  57. def _get_ssl_context(self):
  58. # type: () -> ssl.SSLContext | None
  59. """创建或获取缓存的SSLContext"""
  60. # 缓存键
  61. cache_key = "default"
  62. if not self._verify_ssl:
  63. if not hasattr(ssl, "_create_unverified_context"):
  64. return None
  65. cache_key = "unverified"
  66. if cache_key not in self._ssl_cache:
  67. self._ssl_cache[cache_key] = ssl._create_unverified_context()
  68. elif hasattr(self._verify_ssl, "lower") and self._verify_ssl.lower() not in ("auto", "true"): # type: ignore
  69. cache_key = str(self._verify_ssl)
  70. if cache_key not in self._ssl_cache:
  71. self._ssl_cache[cache_key] = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=cache_key)
  72. elif cache_key not in self._ssl_cache:
  73. ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
  74. if not ssl_context.get_ca_certs():
  75. logger.warning("No system CA certificates found, loading default CA certificates")
  76. _load_system_ca_certs(ssl_context)
  77. self._ssl_cache[cache_key] = ssl_context
  78. return self._ssl_cache[cache_key]
  79. def https_open(self, req):
  80. """处理HTTPS请求,自动处理SSL错误"""
  81. try:
  82. return HTTPSHandler.https_open(self, req)
  83. except Exception as e:
  84. # SSL auto模式:只处理 unable to get local issuer certificate 错误
  85. if self._verify_ssl == "auto" and "unable to get local issuer certificate" in str(e).lower():
  86. msg = "unable to get local issuer certificate, switching to unverified connection for %s"
  87. logger.warning(msg, req.get_full_url())
  88. self._verify_ssl = False
  89. # 创建不验证SSL的临时处理器重试
  90. try: # python 3 / python 2.7.9+
  91. temp_handler = HTTPSHandler(context=self._get_ssl_context())
  92. except (TypeError, AttributeError): # python 2.7.8-
  93. temp_handler = HTTPSHandler()
  94. return temp_handler.https_open(req)
  95. else:
  96. raise
  97. class HttpResponse(object):
  98. """HTTP响应封装类"""
  99. def __init__(self, status, reason, headers, body):
  100. # type: (int, str, Any, str) -> None
  101. """
  102. 初始化HTTP响应对象
  103. Args:
  104. status (int): HTTP状态码
  105. reason (str): 状态原因短语
  106. headers (Any): 响应头对象,直接使用 response.info()
  107. body (str): 响应体内容
  108. """
  109. self.status = status
  110. self.reason = reason
  111. self.headers = headers
  112. self.body = body
  113. def _load_system_ca_certs(ssl_context):
  114. # type: (ssl.SSLContext) -> None
  115. """加载系统CA证书"""
  116. ca_paths = [
  117. # Linux/Unix常用路径
  118. "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu
  119. "/etc/pki/tls/certs/ca-bundle.crt", # RedHat/CentOS
  120. "/etc/ssl/ca-bundle.pem", # OpenSUSE
  121. "/etc/ssl/cert.pem", # OpenBSD
  122. "/usr/local/share/certs/ca-root-nss.crt", # FreeBSD
  123. "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", # Fedora/RHEL
  124. # macOS路径
  125. "/usr/local/etc/openssl/cert.pem", # macOS with Homebrew
  126. "/opt/local/etc/openssl/cert.pem", # macOS with MacPorts
  127. ]
  128. for ca_path in ca_paths:
  129. if os.path.isfile(ca_path):
  130. try:
  131. ssl_context.load_verify_locations(ca_path)
  132. logger.info("Loaded CA certificates from: %s", ca_path)
  133. return # 成功加载后立即返回
  134. except Exception as e:
  135. logger.warning("Failed to load CA certificates from %s: %s", ca_path, e)
  136. def send_http_request(method, url, body=None, headers=None, proxy=None, verify_ssl=True, auth_handler=None):
  137. # type: (str, str, str | bytes | None, dict[str, str] | None, str | None, bool | str, BaseHandler | None) -> HttpResponse # noqa: E501
  138. """
  139. 发送HTTP/HTTPS请求,支持重定向跟随和灵活的SSL验证
  140. Args:
  141. method (str): HTTP方法,如GET、POST等
  142. url (str): 请求的URL,支持嵌入式认证格式 https://user:[email protected]
  143. body (str | bytes | None): 请求体
  144. headers (dict[str, str] | None): 请求头
  145. proxy (str | None): 代理地址,格式为 http://proxy:port
  146. verify_ssl (bool | str): SSL验证配置
  147. - True: 启用标准SSL验证
  148. - False: 禁用SSL验证
  149. - "auto": 启用验证,失败时自动回退到不验证
  150. - str: 自定义CA证书文件路径
  151. auth_handler (BaseHandler | None): 自定义认证处理器
  152. Returns:
  153. HttpResponse: 响应对象
  154. Raises:
  155. URLError: 如果请求失败
  156. ssl.SSLError: 如果SSL验证失败
  157. """
  158. # 解析URL以检查是否包含嵌入式认证信息
  159. m = _AUTH_URL_RE.match(url)
  160. if m:
  161. protocol, username, password, rest = m.groups()
  162. clean_url = protocol + rest
  163. password_mgr = HTTPPasswordMgrWithDefaultRealm() # 使用urllib的内置认证机制
  164. password_mgr.add_password(None, clean_url, unquote(username), unquote(password))
  165. auth_handler = HTTPBasicAuthHandler(password_mgr)
  166. url = clean_url
  167. # 准备请求
  168. if isinstance(body, str):
  169. body = body.encode("utf-8")
  170. req = Request(url, data=body)
  171. req.get_method = lambda: method # type: ignore[attr-defined]
  172. if headers:
  173. for key, value in headers.items():
  174. req.add_header(key, value)
  175. # 创建opener并发送请求
  176. handlers = [NoHTTPErrorProcessor(), SSLAutoFallbackHandler(verify_ssl=verify_ssl)] # type: list[BaseHandler]
  177. if proxy:
  178. handlers.append(ProxyHandler({"http": proxy, "https": proxy}))
  179. if auth_handler:
  180. handlers.append(auth_handler)
  181. opener = build_opener(*handlers)
  182. response = opener.open(req)
  183. # 处理响应
  184. response_headers = response.info()
  185. raw_body = response.read()
  186. decoded_body = _decode_response_body(raw_body, response_headers.get("Content-Type"))
  187. status_code = response.getcode()
  188. reason = getattr(response, "msg", "")
  189. return HttpResponse(status_code, reason, response_headers, decoded_body)
  190. def _decode_response_body(raw_body, content_type):
  191. # type: (bytes, str | None) -> str
  192. """解码HTTP响应体,优先使用UTF-8"""
  193. if not raw_body:
  194. return ""
  195. # 从Content-Type提取charset
  196. charsets = ["utf-8", "gbk", "ascii", "latin-1"]
  197. if content_type and "charset=" in content_type.lower():
  198. start = content_type.lower().find("charset=") + 8
  199. end = content_type.find(";", start)
  200. if end == -1:
  201. end = len(content_type)
  202. charset = content_type[start:end].strip("'\" ").lower()
  203. # 处理常见别名映射
  204. charset_aliases = {"gb2312": "gbk", "iso-8859-1": "latin-1"}
  205. charset = charset_aliases.get(charset, charset)
  206. if charset in charsets:
  207. charsets.remove(charset)
  208. charsets.insert(0, charset)
  209. # 按优先级尝试解码
  210. for encoding in charsets:
  211. try:
  212. return raw_body.decode(encoding)
  213. except (UnicodeDecodeError, LookupError):
  214. continue
  215. # 最终后备:UTF-8替换错误字符
  216. return raw_body.decode("utf-8", errors="replace")