test_util_http.py 20 KB


  1. # coding=utf-8
  2. # type: ignore[index]
  3. """
  4. 测试 ddns.util.http 模块
  5. Test ddns.util.http module
  6. """
  7. from __future__ import unicode_literals
  8. from __init__ import unittest, sys
  9. import json
  10. import socket
  11. import random
  12. from ddns.util.http import HttpResponse, _decode_response_body, quote, USER_AGENT
  13. # Python 2/3 compatibility
  14. if sys.version_info[0] == 2: # python 2
  15. text_type = unicode # noqa: F821
  16. binary_type = str
  17. else:
  18. text_type = str
  19. binary_type = bytes
  20. def to_bytes(s, encoding="utf-8"):
  21. if isinstance(s, text_type):
  22. return s.encode(encoding)
  23. return s
  24. def to_unicode(s, encoding="utf-8"):
  25. if isinstance(s, binary_type):
  26. return s.decode(encoding)
  27. return s
  28. def byte_string(s):
  29. if isinstance(s, text_type):
  30. return s.encode("utf-8")
  31. return s
  32. class TestUserAgent(unittest.TestCase):
  33. """测试 USER_AGENT 常量和用户代理头部设置"""
  34. def _test_user_agent_with_endpoints(self, headers=None, expected_ua=None):
  35. """通用的User-Agent测试辅助方法"""
  36. from ddns.util.http import request
  37. # 多个测试站点,随机顺序以分散负载
  38. test_endpoints = [
  39. "https://httpbingo.org/user-agent",
  40. "http://postman-echo.com/headers",
  41. "http://httpbin.org/user-agent",
  42. ]
  43. if not expected_ua:
  44. test_endpoints.remove("https://httpbingo.org/user-agent") # 如果不验证User-Agent,则不需要这个端点
  45. random.shuffle(test_endpoints)
  46. for endpoint in test_endpoints:
  47. try:
  48. response = request("GET", endpoint, headers=headers, retries=1)
  49. if response.status == 200:
  50. response_data = json.loads(response.body)
  51. # 不同的测试站点响应格式可能略有不同
  52. headers = response_data["headers"] if "headers" in response_data else response_data
  53. # 查找User-Agent头部(不区分大小写)
  54. ua_key = next((key for key in headers if key.lower() == "user-agent"), None)
  55. if ua_key:
  56. ua_value = headers[ua_key]
  57. if expected_ua:
  58. self.assertEqual(ua_value, expected_ua)
  59. else:
  60. # 只验证不是None和空字符串
  61. self.assertIn(ua_value, (None, ""))
  62. else:
  63. self.assertIn(expected_ua, (None, ""))
  64. return True # 测试成功
  65. except OSError as e:
  66. error_msg = str(e).lower()
  67. # 不允许None错误时,网络问题继续尝试,其他错误重新抛出
  68. network_keywords = ["timeout", "connection", "resolution", "unreachable", "network"]
  69. if not any(keyword in error_msg for keyword in network_keywords):
  70. # 如果不是网络问题,重新抛出异常
  71. raise
  72. continue
  73. # 所有端点都失败
  74. return False
  75. def test_user_agent_constant(self):
  76. """测试USER_AGENT常量格式正确"""
  77. # 验证USER_AGENT常量存在且格式正确
  78. self.assertIsInstance(USER_AGENT, str)
  79. self.assertIn("DDNS/", USER_AGENT)
  80. self.assertIn("[email protected]", USER_AGENT)
  81. def test_user_agent_version_format(self):
  82. """测试USER_AGENT包含版本信息"""
  83. # USER_AGENT应该包含版本号或"dev"
  84. has_version = any(char.isdigit() for char in USER_AGENT) or "dev" in USER_AGENT
  85. self.assertTrue(has_version, "USER_AGENT should contain version number or 'dev'")
  86. def test_request_sets_user_agent_automatically(self):
  87. """测试request函数自动设置User-Agent头部"""
  88. if not self._test_user_agent_with_endpoints(expected_ua=USER_AGENT):
  89. self.skipTest("All test endpoints unavailable for user-agent test")
  90. def test_custom_user_agent_override(self):
  91. """测试自定义User-Agent头部会覆盖默认值"""
  92. custom_ua = "CustomAgent/1.0"
  93. headers = {"User-Agent": custom_ua}
  94. if self._test_user_agent_with_endpoints(headers=headers, expected_ua=custom_ua):
  95. # 额外验证不是默认的USER_AGENT
  96. self.assertNotEqual(custom_ua, USER_AGENT)
  97. else:
  98. self.skipTest("All test endpoints unavailable for custom user-agent test")
  99. def test_case_insensitive_user_agent_header(self):
  100. """测试User-Agent头部大小写不敏感处理"""
  101. custom_ua = "TestAgent/2.0"
  102. # 使用小写的user-agent头部
  103. headers = {"user-agent": custom_ua}
  104. if self._test_user_agent_with_endpoints(headers=headers, expected_ua=custom_ua):
  105. # 额外验证不是默认的USER_AGENT
  106. self.assertNotEqual(custom_ua, USER_AGENT)
  107. else:
  108. self.skipTest("All test endpoints unavailable for case-insensitive user-agent test")
  109. def test_user_agent_header_with_empty_value(self):
  110. """测试User-Agent头部设置为空字符串时的行为"""
  111. # 显式设置User-Agent为空字符串
  112. headers = {"User-Agent": ""}
  113. if not self._test_user_agent_with_endpoints(headers=headers, expected_ua=""):
  114. self.skipTest("All test endpoints unavailable or empty User-Agent causes expected failures")
  115. class TestHttpResponse(unittest.TestCase):
  116. """测试 HttpResponse 类"""
  117. def test_init(self):
  118. """测试初始化HttpResponse对象"""
  119. headers = [("Content-Type", "application/json"), ("Content-Length", "100")]
  120. response = HttpResponse(200, "OK", headers, '{"test": true}')
  121. self.assertEqual(response.status, 200)
  122. self.assertEqual(response.reason, "OK")
  123. self.assertEqual(response.headers, headers)
  124. self.assertEqual(response.body, '{"test": true}')
  125. def test_headers_get_case_insensitive(self):
  126. """测试headers.get方法不区分大小写"""
  127. # 模拟 response.info() 对象,支持不区分大小写的 get 方法
  128. class MockHeaders:
  129. def __init__(self):
  130. self._headers = {"Content-Type": "application/json", "Content-Length": "100"}
  131. def get(self, name, default=None):
  132. # 模拟 HTTPMessage 的不区分大小写查找
  133. for key, value in self._headers.items():
  134. if key.lower() == name.lower():
  135. return value
  136. return default
  137. headers = MockHeaders()
  138. response = HttpResponse(200, "OK", headers, "test")
  139. self.assertEqual(response.headers.get("content-type"), "application/json")
  140. self.assertEqual(response.headers.get("Content-Type"), "application/json")
  141. self.assertEqual(response.headers.get("CONTENT-TYPE"), "application/json")
  142. self.assertEqual(response.headers.get("content-length"), "100")
  143. def test_headers_get_not_found(self):
  144. """测试headers.get方法找不到头部时的默认值"""
  145. class MockHeaders:
  146. def __init__(self):
  147. self._headers = {"Content-Type": "application/json"}
  148. def get(self, name, default=None):
  149. for key, value in self._headers.items():
  150. if key.lower() == name.lower():
  151. return value
  152. return default
  153. headers = MockHeaders()
  154. response = HttpResponse(200, "OK", headers, "test")
  155. self.assertIsNone(response.headers.get("Authorization"))
  156. self.assertEqual(response.headers.get("Authorization", "default"), "default")
  157. def test_headers_get_first_match(self):
  158. """测试headers.get方法返回第一个匹配的头部"""
  159. class MockHeaders:
  160. def __init__(self):
  161. self._headers = {"Set-Cookie": "session=abc"} # 只保留第一个值
  162. def get(self, name, default=None):
  163. for key, value in self._headers.items():
  164. if key.lower() == name.lower():
  165. return value
  166. return default
  167. headers = MockHeaders()
  168. response = HttpResponse(200, "OK", headers, "test")
  169. self.assertEqual(response.headers.get("Set-Cookie"), "session=abc")
  170. class TestDecodeResponseBody(unittest.TestCase):
  171. """测试 _decode_response_body 函数"""
  172. def test_utf8_decoding(self):
  173. """测试UTF-8解码"""
  174. raw_body = to_bytes("中文测试", "utf-8")
  175. result = _decode_response_body(raw_body, "text/html; charset=utf-8")
  176. self.assertEqual(result, "中文测试")
  177. def test_gbk_decoding(self):
  178. """测试GBK解码"""
  179. raw_body = to_bytes("中文测试", "gbk")
  180. result = _decode_response_body(raw_body, "text/html; charset=gbk")
  181. self.assertEqual(result, "中文测试")
  182. def test_gb2312_alias(self):
  183. """测试GB2312别名映射到GBK"""
  184. raw_body = to_bytes("中文测试", "gbk")
  185. result = _decode_response_body(raw_body, "text/html; charset=gb2312")
  186. self.assertEqual(result, "中文测试")
  187. def test_iso_8859_1_alias(self):
  188. """测试ISO-8859-1别名映射到latin-1"""
  189. raw_body = to_bytes("test", "latin-1")
  190. result = _decode_response_body(raw_body, "text/html; charset=iso-8859-1")
  191. self.assertEqual(result, "test")
  192. def test_no_charset_fallback_to_utf8(self):
  193. """测试没有charset时默认使用UTF-8"""
  194. raw_body = to_bytes("test", "utf-8")
  195. result = _decode_response_body(raw_body, "text/html")
  196. self.assertEqual(result, "test")
  197. def test_no_content_type(self):
  198. """测试没有Content-Type时使用UTF-8"""
  199. raw_body = to_bytes("test", "utf-8")
  200. result = _decode_response_body(raw_body, None)
  201. self.assertEqual(result, "test")
  202. def test_empty_body(self):
  203. """测试空响应体"""
  204. result = _decode_response_body(byte_string(""), "text/html")
  205. self.assertEqual(result, "")
  206. def test_invalid_encoding_fallback(self):
  207. """测试无效编码时的后备机制"""
  208. raw_body = to_bytes("中文测试", "utf-8")
  209. # 指定一个无效的编码
  210. result = _decode_response_body(raw_body, "text/html; charset=invalid-encoding")
  211. self.assertEqual(result, "中文测试") # 应该回退到UTF-8
  212. def test_malformed_charset(self):
  213. """测试格式错误的charset"""
  214. raw_body = to_bytes("test", "utf-8")
  215. result = _decode_response_body(raw_body, "text/html; charset=")
  216. self.assertEqual(result, "test")
  217. class TestSendHttpRequest(unittest.TestCase):
  218. """测试 request 函数"""
  219. def test_basic_get_request_with_json_response(self):
  220. """测试基本GET请求和JSON响应解析"""
  221. from ddns.util.http import request
  222. try:
  223. response = request("GET", "http://postman-echo.com/get?test=ddns&format=json")
  224. self.assertEqual(response.status, 200)
  225. self.assertIsNotNone(response.body)
  226. # 验证响应内容是JSON格式
  227. data = json.loads(response.body)
  228. self.assertIn("args", data)
  229. self.assertIn("url", data)
  230. self.assertIn("test", data["args"])
  231. self.assertEqual(data["args"]["test"], "ddns")
  232. self.assertIsInstance(data, dict)
  233. self.assertTrue(len(data) > 0)
  234. except (socket.timeout, ConnectionError) as e:
  235. self.skipTest("Network unavailable: {}".format(str(e)))
  236. except Exception as e:
  237. error_msg = str(e).lower()
  238. network_keywords = ["timeout", "connection", "resolution", "unreachable", "network"]
  239. if any(keyword in error_msg for keyword in network_keywords):
  240. self.skipTest("Network unavailable for GET request test: {}".format(str(e)))
  241. else:
  242. raise
  243. def test_http_401_status_code_with_headers(self):
  244. """测试HTTP 401认证失败状态码处理"""
  245. from ddns.util.http import request
  246. try:
  247. headers = {
  248. "Authorization": "Bearer invalid-token",
  249. "Content-Type": "application/json",
  250. "User-Agent": "DDNS-Client/4.0",
  251. }
  252. response = request("GET", "http://postman-echo.com/status/401", headers=headers)
  253. self.assertEqual(response.status, 401)
  254. self.assertIsNotNone(response.body)
  255. except (socket.timeout, ConnectionError) as e:
  256. self.skipTest("Network unavailable: {}".format(str(e)))
  257. except Exception as e:
  258. error_msg = str(e).lower()
  259. network_keywords = ["timeout", "connection", "resolution", "unreachable", "network"]
  260. if any(keyword in error_msg for keyword in network_keywords):
  261. self.skipTest("Network unavailable for 401 status test: {}".format(str(e)))
  262. else:
  263. raise
  264. def test_ssl_auto_mode(self):
  265. """测试SSL auto模式"""
  266. from ddns.util.http import request
  267. try:
  268. response = request("GET", "https://postman-echo.com/status/200", verify="auto")
  269. self.assertEqual(response.status, 200, "SSL auto模式应该成功")
  270. self.assertIsNotNone(response.body)
  271. except (socket.timeout, ConnectionError) as e:
  272. self.skipTest("Network unavailable: {}".format(str(e)))
  273. except Exception as e:
  274. error_msg = str(e).lower()
  275. network_keywords = ["timeout", "connection", "resolution", "unreachable", "network", "ssl", "certificate"]
  276. if any(keyword in error_msg for keyword in network_keywords):
  277. self.skipTest("Network or SSL unavailable for SSL auto test: {}".format(str(e)))
  278. else:
  279. raise
  280. def test_http_400_status_code(self):
  281. """测试HTTP 400 Bad Request状态码"""
  282. from ddns.util.http import request
  283. try:
  284. response_400 = request("GET", "http://postman-echo.com/status/400")
  285. self.assertEqual(response_400.status, 400, "应该返回400 Bad Request状态码")
  286. self.assertIsNotNone(response_400.body, "400响应应该有响应体")
  287. self.assertIsNotNone(response_400.headers, "400响应应该有响应头")
  288. self.assertIsNotNone(response_400.reason, "400响应应该有状态原因")
  289. except Exception as e:
  290. # 网络问题时跳过测试
  291. error_msg = str(e).lower()
  292. network_keywords = ["timeout", "connection", "resolution", "unreachable", "network"]
  293. if any(keyword in error_msg for keyword in network_keywords):
  294. self.skipTest("Network unavailable for HTTP 400 status test: {}".format(str(e)))
  295. else:
  296. # 其他异常重新抛出
  297. raise
  298. def test_basic_auth_with_url_embedding(self):
  299. """测试URL嵌入式基本认证格式"""
  300. # 测试不同场景的URL嵌入认证格式
  301. test_cases = [
  302. {
  303. "username": "user",
  304. "password": "pass",
  305. "domain": "example.com",
  306. "expected": "https://user:[email protected]",
  307. },
  308. {
  309. "username": "[email protected]",
  310. "password": "password!",
  311. "domain": "api.service.com",
  312. "expected": "https://test%40email.com:password%[email protected]",
  313. },
  314. {
  315. "username": "user+tag",
  316. "password": "p@ss w0rd",
  317. "domain": "subdomain.example.org",
  318. "expected": "https://user%2Btag:p%40ss%[email protected]",
  319. },
  320. ]
  321. for case in test_cases:
  322. username_encoded = quote(case["username"], safe="")
  323. password_encoded = quote(case["password"], safe="")
  324. auth_url = "https://{0}:{1}@{2}".format(username_encoded, password_encoded, case["domain"])
  325. self.assertEqual(
  326. auth_url,
  327. case["expected"],
  328. "Failed for username={}, password={}".format(case["username"], case["password"]),
  329. )
  330. def test_http_get_redirect(self):
  331. """测试HTTP GET重定向处理"""
  332. from ddns.util.http import request
  333. # 尝试多个测试端点以提高可靠性
  334. test_endpoints = [
  335. "http://httpbin.org/redirect-to?url=http://httpbin.org/get",
  336. "http://httpbingo.org/redirect-to?url=http://httpbingo.org/get",
  337. ]
  338. last_exception = None
  339. for redirect_url in test_endpoints:
  340. try:
  341. # HTTP重定向处理 - GET重定向
  342. response = request("GET", redirect_url, verify=False, retries=3)
  343. # 重定向后应该成功
  344. if response.status == 200:
  345. self.assertIsNotNone(response.body)
  346. # 验证最终到达了正确的端点
  347. data = json.loads(response.body)
  348. self.assertIn("url", data)
  349. expected_content = "httpbin.org/get" if "httpbin.org" in redirect_url else "httpbingo.org/get"
  350. self.assertIn(expected_content, data["url"])
  351. return # 成功则退出
  352. elif response.status >= 500:
  353. # 5xx错误,尝试下一个端点
  354. continue
  355. except Exception as e:
  356. last_exception = e
  357. # 网络问题时继续尝试下一个端点
  358. error_msg = str(e).lower()
  359. network_keywords = [
  360. "timeout",
  361. "connection",
  362. "resolution",
  363. "unreachable",
  364. "network",
  365. "ssl",
  366. "certificate",
  367. ]
  368. if any(keyword in error_msg for keyword in network_keywords):
  369. continue # 尝试下一个端点
  370. else:
  371. # 其他异常重新抛出
  372. raise
  373. # 如果所有端点都失败,跳过测试
  374. error_info = " - Last error: {}".format(str(last_exception)) if last_exception else ""
  375. self.skipTest("All network endpoints unavailable for GET redirect test{}".format(error_info))
  376. def test_http_post_redirect(self):
  377. """测试HTTP POST重定向行为(应该转换为GET请求)"""
  378. from ddns.util.http import request
  379. # 尝试多个测试端点以提高可靠性
  380. test_endpoints = [
  381. "http://httpbingo.org/redirect-to?url=/get",
  382. "http://httpbin.org/redirect-to?url=http://httpbin.org/get",
  383. ]
  384. last_exception = None
  385. for redirect_url in test_endpoints:
  386. try:
  387. post_data = "test=data&method=POST->GET"
  388. response_post = request("POST", redirect_url, data=post_data, verify=False, retries=3)
  389. # 重定向后应该成功
  390. if response_post.status == 200:
  391. self.assertIsNotNone(response_post.body)
  392. # 验证最终到达了GET端点
  393. data_post = json.loads(response_post.body)
  394. self.assertIn("url", data_post)
  395. self.assertIn(".org/get", data_post["url"])
  396. return # 成功则退出
  397. elif response_post.status >= 500:
  398. # 5xx错误,尝试下一个端点
  399. continue
  400. except Exception as e:
  401. last_exception = e
  402. # 网络问题时继续尝试下一个端点
  403. error_msg = str(e).lower()
  404. network_keywords = [
  405. "timeout",
  406. "connection",
  407. "resolution",
  408. "unreachable",
  409. "network",
  410. "ssl",
  411. "certificate",
  412. ]
  413. if any(keyword in error_msg for keyword in network_keywords):
  414. continue # 尝试下一个端点
  415. else:
  416. # 其他异常重新抛出
  417. raise
  418. # 如果所有端点都失败,跳过测试
  419. error_info = " - Last error: {}".format(str(last_exception)) if last_exception else ""
  420. self.skipTest("All network endpoints unavailable for POST redirect test{}".format(error_info))
  421. if __name__ == "__main__":
  422. unittest.main()