test_util_http.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. # coding=utf-8
  2. # type: ignore[index]
  3. """
  4. 测试 ddns.util.http 模块
  5. Test ddns.util.http module
  6. """
  7. from __future__ import unicode_literals
  8. from __init__ import unittest, sys
  9. import json
  10. import socket
  11. from ddns.util.http import (
  12. HttpResponse,
  13. _decode_response_body,
  14. quote,
  15. )
  16. # Python 2/3 compatibility
  17. if sys.version_info[0] == 2: # python 2
  18. text_type = unicode # noqa: F821
  19. binary_type = str
  20. else:
  21. text_type = str
  22. binary_type = bytes
  23. def to_bytes(s, encoding="utf-8"):
  24. if isinstance(s, text_type):
  25. return s.encode(encoding)
  26. return s
  27. def to_unicode(s, encoding="utf-8"):
  28. if isinstance(s, binary_type):
  29. return s.decode(encoding)
  30. return s
  31. def byte_string(s):
  32. if isinstance(s, text_type):
  33. return s.encode("utf-8")
  34. return s
  35. class TestHttpResponse(unittest.TestCase):
  36. """测试 HttpResponse 类"""
  37. def test_init(self):
  38. """测试初始化HttpResponse对象"""
  39. headers = [("Content-Type", "application/json"), ("Content-Length", "100")]
  40. response = HttpResponse(200, "OK", headers, '{"test": true}')
  41. self.assertEqual(response.status, 200)
  42. self.assertEqual(response.reason, "OK")
  43. self.assertEqual(response.headers, headers)
  44. self.assertEqual(response.body, '{"test": true}')
  45. def test_headers_get_case_insensitive(self):
  46. """测试headers.get方法不区分大小写"""
  47. # 模拟 response.info() 对象,支持不区分大小写的 get 方法
  48. class MockHeaders:
  49. def __init__(self):
  50. self._headers = {"Content-Type": "application/json", "Content-Length": "100"}
  51. def get(self, name, default=None):
  52. # 模拟 HTTPMessage 的不区分大小写查找
  53. for key, value in self._headers.items():
  54. if key.lower() == name.lower():
  55. return value
  56. return default
  57. headers = MockHeaders()
  58. response = HttpResponse(200, "OK", headers, "test")
  59. self.assertEqual(response.headers.get("content-type"), "application/json")
  60. self.assertEqual(response.headers.get("Content-Type"), "application/json")
  61. self.assertEqual(response.headers.get("CONTENT-TYPE"), "application/json")
  62. self.assertEqual(response.headers.get("content-length"), "100")
  63. def test_headers_get_not_found(self):
  64. """测试headers.get方法找不到头部时的默认值"""
  65. class MockHeaders:
  66. def __init__(self):
  67. self._headers = {"Content-Type": "application/json"}
  68. def get(self, name, default=None):
  69. for key, value in self._headers.items():
  70. if key.lower() == name.lower():
  71. return value
  72. return default
  73. headers = MockHeaders()
  74. response = HttpResponse(200, "OK", headers, "test")
  75. self.assertIsNone(response.headers.get("Authorization"))
  76. self.assertEqual(response.headers.get("Authorization", "default"), "default")
  77. def test_headers_get_first_match(self):
  78. """测试headers.get方法返回第一个匹配的头部"""
  79. class MockHeaders:
  80. def __init__(self):
  81. self._headers = {"Set-Cookie": "session=abc"} # 只保留第一个值
  82. def get(self, name, default=None):
  83. for key, value in self._headers.items():
  84. if key.lower() == name.lower():
  85. return value
  86. return default
  87. headers = MockHeaders()
  88. response = HttpResponse(200, "OK", headers, "test")
  89. self.assertEqual(response.headers.get("Set-Cookie"), "session=abc")
  90. class TestDecodeResponseBody(unittest.TestCase):
  91. """测试 _decode_response_body 函数"""
  92. def test_utf8_decoding(self):
  93. """测试UTF-8解码"""
  94. raw_body = to_bytes("中文测试", "utf-8")
  95. result = _decode_response_body(raw_body, "text/html; charset=utf-8")
  96. self.assertEqual(result, "中文测试")
  97. def test_gbk_decoding(self):
  98. """测试GBK解码"""
  99. raw_body = to_bytes("中文测试", "gbk")
  100. result = _decode_response_body(raw_body, "text/html; charset=gbk")
  101. self.assertEqual(result, "中文测试")
  102. def test_gb2312_alias(self):
  103. """测试GB2312别名映射到GBK"""
  104. raw_body = to_bytes("中文测试", "gbk")
  105. result = _decode_response_body(raw_body, "text/html; charset=gb2312")
  106. self.assertEqual(result, "中文测试")
  107. def test_iso_8859_1_alias(self):
  108. """测试ISO-8859-1别名映射到latin-1"""
  109. raw_body = to_bytes("test", "latin-1")
  110. result = _decode_response_body(raw_body, "text/html; charset=iso-8859-1")
  111. self.assertEqual(result, "test")
  112. def test_no_charset_fallback_to_utf8(self):
  113. """测试没有charset时默认使用UTF-8"""
  114. raw_body = to_bytes("test", "utf-8")
  115. result = _decode_response_body(raw_body, "text/html")
  116. self.assertEqual(result, "test")
  117. def test_no_content_type(self):
  118. """测试没有Content-Type时使用UTF-8"""
  119. raw_body = to_bytes("test", "utf-8")
  120. result = _decode_response_body(raw_body, None)
  121. self.assertEqual(result, "test")
  122. def test_empty_body(self):
  123. """测试空响应体"""
  124. result = _decode_response_body(byte_string(""), "text/html")
  125. self.assertEqual(result, "")
  126. def test_invalid_encoding_fallback(self):
  127. """测试无效编码时的后备机制"""
  128. raw_body = to_bytes("中文测试", "utf-8")
  129. # 指定一个无效的编码
  130. result = _decode_response_body(raw_body, "text/html; charset=invalid-encoding")
  131. self.assertEqual(result, "中文测试") # 应该回退到UTF-8
  132. def test_malformed_charset(self):
  133. """测试格式错误的charset"""
  134. raw_body = to_bytes("test", "utf-8")
  135. result = _decode_response_body(raw_body, "text/html; charset=")
  136. self.assertEqual(result, "test")
  137. class TestSendHttpRequest(unittest.TestCase):
  138. """测试 send_http_request 函数"""
  139. def test_get_request_real_api(self):
  140. """测试真实的GET请求 - 使用postman-echo API服务"""
  141. from ddns.util.http import send_http_request
  142. # 使用postman-echo.com提供的GET测试端点
  143. try:
  144. response = send_http_request("GET", "http://postman-echo.com/get?test=ddns")
  145. self.assertEqual(response.status, 200)
  146. self.assertIsNotNone(response.body)
  147. # 验证响应内容是JSON格式
  148. data = json.loads(response.body)
  149. self.assertIn("args", data)
  150. self.assertIn("url", data)
  151. self.assertIn("test", data["args"])
  152. self.assertEqual(data["args"]["test"], "ddns")
  153. except (socket.timeout, ConnectionError) as e:
  154. # 网络不可用时跳过测试
  155. self.skipTest("Network unavailable: {}".format(str(e)))
  156. def test_json_api_response(self):
  157. """测试JSON API响应解析"""
  158. from ddns.util.http import send_http_request
  159. try:
  160. # 使用postman-echo.com的基本GET端点,返回请求信息
  161. response = send_http_request("GET", "http://postman-echo.com/get?format=json")
  162. self.assertEqual(response.status, 200)
  163. self.assertIsNotNone(response.body)
  164. # 验证返回的是有效的JSON
  165. data = json.loads(response.body)
  166. # postman-echo返回请求信息对象
  167. self.assertIn("args", data)
  168. self.assertIn("url", data)
  169. self.assertIsInstance(data, dict)
  170. self.assertTrue(len(data) > 0)
  171. except Exception as e:
  172. self.skipTest("Network unavailable: {}".format(str(e)))
  173. def test_provider_api_patterns(self):
  174. """测试常见DNS provider API模式"""
  175. from ddns.util.http import send_http_request
  176. try:
  177. # 测试认证失败场景 - 使用postman-echo模拟401错误
  178. headers = {
  179. "Authorization": "Bearer invalid-token",
  180. "Content-Type": "application/json",
  181. "User-Agent": "DDNS-Client/4.0",
  182. }
  183. # 使用postman-echo模拟401认证失败响应
  184. response = send_http_request("GET", "http://postman-echo.com/status/401", headers=headers)
  185. # 应该返回401认证失败
  186. self.assertEqual(response.status, 401)
  187. # 401响应可能有空响应体,这是正常的
  188. self.assertIsNotNone(response.body)
  189. except Exception as e:
  190. self.skipTest("Network unavailable: {}".format(str(e)))
  191. def test_http_400_bad_request_handling(self):
  192. """测试HTTP 400 Bad Request错误处理"""
  193. from ddns.util.http import send_http_request
  194. try:
  195. # 使用postman-echo模拟400错误
  196. response = send_http_request("GET", "http://postman-echo.com/status/400")
  197. # 验证状态码为400
  198. self.assertEqual(response.status, 400, "应该返回400 Bad Request状态码")
  199. # 验证响应对象的完整性
  200. self.assertIsNotNone(response.body, "400响应应该有响应体")
  201. self.assertIsNotNone(response.headers, "400响应应该有响应头")
  202. self.assertIsNotNone(response.reason, "400响应应该有状态原因")
  203. except Exception as e:
  204. # 网络问题时跳过测试
  205. error_msg = str(e).lower()
  206. if any(
  207. keyword in error_msg for keyword in ["timeout", "connection", "resolution", "unreachable", "network"]
  208. ):
  209. self.skipTest("Network unavailable for HTTP 400 test: {}".format(str(e)))
  210. else:
  211. # 其他异常重新抛出
  212. raise
  213. def test_dns_over_https_simulation(self):
  214. """测试DNS类型的API响应解析"""
  215. from ddns.util.http import send_http_request
  216. try:
  217. headers = {"Accept": "application/dns-json", "User-Agent": "DDNS-Test/1.0"}
  218. # 使用postman-echo模拟一个带有特定结构的JSON响应
  219. response = send_http_request(
  220. "GET", "http://postman-echo.com/get?domain=example.com&type=A", headers=headers
  221. )
  222. self.assertEqual(response.status, 200)
  223. data = json.loads(response.body)
  224. # postman-echo返回请求信息,包含args参数
  225. self.assertIn("args", data)
  226. self.assertIn("domain", data["args"])
  227. self.assertEqual(data["args"]["domain"], "example.com")
  228. except Exception as e:
  229. self.skipTest("Network unavailable: {}".format(str(e)))
  230. def test_basic_auth_with_url_embedding(self):
  231. """测试URL嵌入式基本认证格式"""
  232. # 测试不同场景的URL嵌入认证格式
  233. test_cases = [
  234. {
  235. "username": "user",
  236. "password": "pass",
  237. "domain": "example.com",
  238. "expected": "https://user:[email protected]",
  239. },
  240. {
  241. "username": "[email protected]",
  242. "password": "password!",
  243. "domain": "api.service.com",
  244. "expected": "https://test%40email.com:password%[email protected]",
  245. },
  246. {
  247. "username": "user+tag",
  248. "password": "p@ss w0rd",
  249. "domain": "subdomain.example.org",
  250. "expected": "https://user%2Btag:p%40ss%[email protected]",
  251. },
  252. ]
  253. for case in test_cases:
  254. username_encoded = quote(case["username"], safe="")
  255. password_encoded = quote(case["password"], safe="")
  256. auth_url = "https://{0}:{1}@{2}".format(username_encoded, password_encoded, case["domain"])
  257. self.assertEqual(
  258. auth_url,
  259. case["expected"],
  260. "Failed for username={}, password={}".format(case["username"], case["password"]),
  261. )
  262. def test_basic_auth_with_httpbin(self):
  263. """Test basic auth URL format and verification with URL-embedded authentication"""
  264. from ddns.util.http import send_http_request
  265. # Test with special credentials containing @ and . characters
  266. special_username = "[email protected]"
  267. special_password = "passwo.rd"
  268. username_encoded = quote(special_username, safe="")
  269. password_encoded = quote(special_password, safe="")
  270. # Verify URL encoding of special characters
  271. self.assertEqual(username_encoded, "user%40test.com")
  272. self.assertEqual(password_encoded, "passwo.rd")
  273. # Create auth URL with encoded credentials in URL auth but original in path parameters
  274. auth_url = "https://{0}:{1}@httpbin.org/basic-auth/{2}/{3}".format(
  275. username_encoded, password_encoded, username_encoded, password_encoded
  276. )
  277. # Try to make actual request
  278. try:
  279. response = send_http_request("GET", auth_url)
  280. except (OSError, IOError) as e:
  281. # Skip for Network Exceptions (timeout, connection, etc.)
  282. raise unittest.SkipTest("Network error, skipping httpbin test: {0}".format(e))
  283. # Verify successful response if we get here
  284. if response.status > 500:
  285. # httpbin.org may return 500 if overloaded, skip this test
  286. raise unittest.SkipTest("httpbin.org returned 500, skipping test")
  287. self.assertEqual(response.status, 200)
  288. self.assertIn("authenticated", response.body)
  289. self.assertIn("user", response.body)
  290. def test_ssl_auto_fallback_real_network(self):
  291. """测试SSL auto模式的真实网络自动降级行为"""
  292. from ddns.util.http import send_http_request
  293. test_url = "https://postman-echo.com/status/200" # 使用postman-echo的测试站点
  294. try:
  295. # 1. 测试auto模式:应该自动降级成功
  296. response_auto = send_http_request("GET", test_url, verify_ssl="auto")
  297. self.assertEqual(response_auto.status, 200, "auto模式自动降级成功访问测试站点")
  298. self.assertIsNotNone(response_auto.body)
  299. # 2. 验证禁用SSL验证模式也能成功(作为对照)
  300. response_false = send_http_request("GET", test_url, verify_ssl=False)
  301. self.assertEqual(response_false.status, 200, "禁用SSL验证应该成功访问自签名证书站点")
  302. except Exception as e:
  303. # 网络问题时跳过测试
  304. error_msg = str(e).lower()
  305. if any(keyword in error_msg for keyword in ["timeout", "resolution", "unreachable"]):
  306. self.skipTest("Network unavailable for SSL fallback test: {}".format(str(e)))
  307. else:
  308. # 其他异常重新抛出
  309. raise
  310. if __name__ == "__main__":
  311. unittest.main()