http.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. # coding=utf-8
  2. """
  3. HTTP请求工具模块
  4. HTTP utilities module for DDNS project.
  5. Provides common HTTP functionality including redirect following support.
  6. @author: NewFuture
  7. """
  8. from logging import getLogger
  9. import ssl
  10. import os
  11. try: # python 3
  12. from http.client import HTTPSConnection, HTTPConnection, HTTPException
  13. from urllib.parse import urlparse
  14. except ImportError: # python 2
  15. from httplib import HTTPSConnection, HTTPConnection, HTTPException # type: ignore[no-redef]
  16. from urlparse import urlparse # type: ignore[no-redef]
  17. __all__ = ["send_http_request", "HttpResponse"]
  18. logger = getLogger().getChild(__name__)
  19. class HttpResponse(object):
  20. """HTTP响应封装类"""
  21. def __init__(self, status, reason, headers, body):
  22. # type: (int, str, list[tuple[str, str]], str) -> None
  23. """
  24. 初始化HTTP响应对象
  25. Args:
  26. status (int): HTTP状态码
  27. reason (str): 状态原因短语
  28. headers (list[tuple[str, str]]): 响应头列表,保持原始格式和顺序
  29. body (str): 响应体内容
  30. """
  31. self.status = status
  32. self.reason = reason
  33. self.headers = headers
  34. self.body = body
  35. def get_header(self, name, default=None):
  36. # type: (str, str | None) -> str | None
  37. """
  38. 获取指定名称的头部值(不区分大小写)
  39. Args:
  40. name (str): 头部名称
  41. Returns:
  42. str | None: 头部值,如果不存在则返回None
  43. """
  44. name_lower = name.lower()
  45. for header_name, header_value in self.headers:
  46. if header_name.lower() == name_lower:
  47. return header_value
  48. return default
  49. def _create_connection(hostname, port, is_https, proxy, verify_ssl):
  50. # type: (str, int | None, bool, str | None, bool | str) -> HTTPConnection | HTTPSConnection
  51. """创建HTTP/HTTPS连接"""
  52. target = proxy or hostname
  53. if not is_https:
  54. conn = HTTPConnection(target, port)
  55. else:
  56. ssl_context = ssl.create_default_context()
  57. if verify_ssl is False:
  58. # 禁用SSL验证
  59. ssl_context.check_hostname = False
  60. ssl_context.verify_mode = ssl.CERT_NONE
  61. elif hasattr(verify_ssl, "lower") and verify_ssl.lower() not in ("auto", "true"): # type: ignore[union-attr]
  62. # 使用自定义CA证书 lower 判断 str/unicode 兼容 python2
  63. try:
  64. ssl_context.load_verify_locations(verify_ssl) # type: ignore[arg-type]
  65. except Exception as e:
  66. logger.error("Failed to load CA certificate from %s: %s", verify_ssl, e)
  67. else:
  68. # 默认验证,尝试加载系统证书
  69. _load_system_ca_certs(ssl_context)
  70. conn = HTTPSConnection(target, port, context=ssl_context)
  71. # 设置代理隧道
  72. if proxy:
  73. conn.set_tunnel(hostname, port) # type: ignore[attr-defined]
  74. return conn
  75. def _load_system_ca_certs(ssl_context):
  76. # type: (ssl.SSLContext) -> None
  77. """加载系统CA证书"""
  78. # 常见CA证书路径
  79. ca_paths = [
  80. # Linux/Unix常用路径
  81. "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu
  82. "/etc/pki/tls/certs/ca-bundle.crt", # RedHat/CentOS
  83. "/etc/ssl/ca-bundle.pem", # OpenSUSE
  84. "/etc/ssl/cert.pem", # OpenBSD
  85. "/usr/local/share/certs/ca-root-nss.crt", # FreeBSD
  86. "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", # Fedora/RHEL
  87. # macOS路径
  88. "/usr/local/etc/openssl/cert.pem", # macOS with Homebrew
  89. "/opt/local/etc/openssl/cert.pem", # macOS with MacPorts
  90. ]
  91. # Windows额外路径
  92. if os.name == "nt":
  93. ca_paths.append("C:\\Program Files\\Git\\mingw64\\ssl\\cert.pem")
  94. ca_paths.append("C:\\Program Files\\OpenSSL\\ssl\\cert.pem")
  95. loaded_count = 0
  96. for ca_path in ca_paths:
  97. if os.path.isfile(ca_path):
  98. try:
  99. ssl_context.load_verify_locations(ca_path)
  100. loaded_count += 1
  101. logger.debug("Loaded CA certificates from: %s", ca_path)
  102. except Exception as e:
  103. logger.debug("Failed to load CA certificates from %s: %s", ca_path, e)
  104. if loaded_count > 0:
  105. logger.debug("Successfully loaded CA certificates from %d locations", loaded_count)
  106. def _close_connection(conn):
  107. # type: (HTTPConnection | HTTPSConnection) -> None
  108. """关闭HTTP/HTTPS连接"""
  109. try:
  110. conn.close()
  111. except Exception as e:
  112. logger.warning("Failed to close connection: %s", e)
  113. def send_http_request(method, url, body=None, headers=None, proxy=None, max_redirects=5, verify_ssl=True):
  114. # type: (str, str, str | bytes | None, dict[str, str] | None, str | None, int, bool | str) -> HttpResponse
  115. """
  116. 发送HTTP/HTTPS请求,支持重定向跟随和灵活的SSL验证
  117. Send HTTP/HTTPS request with support for redirect following and flexible SSL verification.
  118. Args:
  119. method (str): HTTP方法,如GET、POST等
  120. url (str): 请求的URL
  121. body (str | bytes | None): 请求体
  122. headers (dict[str, str] | None): 请求头
  123. proxy (str | None): 代理地址
  124. max_redirects (int): 最大重定向次数
  125. verify_ssl (bool | str): 是否验证SSL证书
  126. Returns:
  127. HttpResponse: 响应对象,包含状态码、头部和解码后的内容
  128. Raises:
  129. HTTPException: 如果请求失败或重定向次数超过限制
  130. ssl.SSLError: 如果SSL验证失败
  131. """
  132. if max_redirects <= 0:
  133. raise HTTPException("Too many redirects")
  134. # 解析URL
  135. url_obj = urlparse(url)
  136. is_https = url_obj.scheme == "https"
  137. hostname = url_obj.hostname or url_obj.netloc.split(":")[0]
  138. request_path = "{}?{}".format(url_obj.path, url_obj.query) if url_obj.query else url_obj.path
  139. headers = headers or {}
  140. # 创建连接
  141. actual_verify_ssl = verify_ssl
  142. conn = _create_connection(hostname, url_obj.port, is_https, proxy, verify_ssl)
  143. # 执行请求,处理SSL错误
  144. try:
  145. conn.request(method, request_path, body, headers)
  146. response = conn.getresponse()
  147. except ssl.SSLError:
  148. _close_connection(conn)
  149. if verify_ssl == "auto" and is_https:
  150. logger.warning("SSL verification failed, switching to unverified connection %s", url)
  151. # 重新连接,忽略SSL验证
  152. conn = _create_connection(hostname, url_obj.port, is_https, proxy, False)
  153. conn.request(method, request_path, body, headers)
  154. response = conn.getresponse()
  155. actual_verify_ssl = False
  156. else:
  157. raise
  158. # 检查重定向
  159. status = response.status
  160. if 300 <= status < 400:
  161. location = response.getheader("Location")
  162. _close_connection(conn)
  163. if not location:
  164. # 无Location头的重定向
  165. logger.warning("Redirect status %d but no Location header", status)
  166. location = ""
  167. # 构建重定向URL
  168. redirect_url = _build_redirect_url(location, "{}://{}".format(url_obj.scheme, url_obj.netloc), url_obj.path)
  169. # 如果重定向URL没有查询字符串,但原始URL有,则附加
  170. if url_obj.query and "?" not in redirect_url:
  171. redirect_url += "?" + url_obj.query
  172. # 确定重定向方法:303或302+POST转为GET,其他保持原方法
  173. if status == 303 or (status == 302 and method == "POST"):
  174. method, body = "GET", None
  175. # 如果从POST转为GET,移除相关的头部
  176. if headers:
  177. headers = {k: v for k, v in headers.items() if k.lower() not in ("content-length", "content-type")}
  178. logger.info("Redirecting [%d] to: %s", status, redirect_url)
  179. # 递归处理重定向
  180. return send_http_request(method, redirect_url, body, headers, proxy, max_redirects - 1, actual_verify_ssl)
  181. # 处理最终响应
  182. content_type = response.getheader("Content-Type")
  183. response_headers = response.getheaders()
  184. raw_body = response.read()
  185. _close_connection(conn)
  186. # 解码响应体并创建响应对象
  187. decoded_body = _decode_response_body(raw_body, content_type)
  188. return HttpResponse(status, response.reason, response_headers, decoded_body)
  189. def _build_redirect_url(location, base, path):
  190. # type: (str, str, str) -> str
  191. """构建重定向URL,使用简单的字符串操作"""
  192. if location.startswith("http"):
  193. return location
  194. if location.startswith("/"):
  195. # 绝对路径:使用base的scheme和netloc
  196. base_url = urlparse(base)
  197. return "{}://{}{}".format(base_url.scheme, base_url.netloc, location)
  198. else:
  199. base_path = path.rsplit("/", 1)[0] if "/" in path else ""
  200. if not base_path.endswith("/"):
  201. base_path += "/"
  202. return base + base_path + location
  203. def _decode_response_body(raw_body, content_type):
  204. # type: (bytes, str | None) -> str
  205. """解码HTTP响应体,优先使用UTF-8"""
  206. if not raw_body:
  207. return ""
  208. # 从Content-Type提取charset
  209. charsets = ["utf-8", "gbk", "ascii", "latin-1"]
  210. if content_type and "charset=" in content_type.lower():
  211. start = content_type.lower().find("charset=") + 8
  212. end = content_type.find(";", start)
  213. if end == -1:
  214. end = len(content_type)
  215. charset = content_type[start:end].strip("'\" ").lower()
  216. charsets.insert(0, charset)
  217. # 处理常见别名
  218. if charset == "gb2312":
  219. charsets.remove("gbk")
  220. charsets.insert(0, "gbk")
  221. elif charset == "iso-8859-1":
  222. charsets.remove("latin-1")
  223. charsets.insert(0, "latin-1")
  224. # 按优先级尝试解码
  225. for encoding in charsets:
  226. try:
  227. return raw_body.decode(encoding)
  228. except (UnicodeDecodeError, LookupError):
  229. continue
  230. # 最终后备:UTF-8替换错误字符
  231. return raw_body.decode("utf-8", errors="replace")