test_cache.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672
  1. # -*- coding: utf-8 -*-
  2. # type: ignore[index,operator,assignment]
  3. """
  4. Test cases for cache module
  5. @author: GitHub Copilot
  6. """
  7. from __init__ import patch, unittest
  8. import os
  9. import tempfile
  10. from time import sleep
  11. from ddns.cache import Cache # noqa: E402
  12. class TestCache(unittest.TestCase):
  13. """Test cases for Cache class"""
  14. def setUp(self):
  15. """Set up test fixtures"""
  16. # Create a temporary directory for test cache files
  17. self.cache_file = tempfile.mktemp(prefix="ddns_test_cache_", suffix="pk1")
  18. # self.cache_file = os.path.join(self.test_dir, "test_cache.%d.pkl" % random.randint(1, 10000))
  19. def tearDown(self):
  20. """Clean up test fixtures"""
  21. # Remove the temporary directory and all its contents
  22. if os.path.exists(self.cache_file):
  23. os.remove(self.cache_file)
  24. def test_init_new_cache(self):
  25. """Test cache initialization with new cache file"""
  26. cache = Cache(self.cache_file)
  27. # Verify initialization
  28. self.assertEqual(len(cache), 0)
  29. self.assertIsInstance(cache.time, float)
  30. self.assertFalse(os.path.exists(self.cache_file)) # File not created until sync
  31. def test_init_with_logger(self):
  32. """Test cache initialization with custom logger"""
  33. import logging
  34. logger = logging.getLogger("test_logger")
  35. cache = Cache(self.cache_file, logger=logger)
  36. self.assertEqual(len(cache), 0)
  37. def test_init_with_sync(self):
  38. """Test cache initialization with sync enabled"""
  39. cache = Cache(self.cache_file, sync=True)
  40. self.assertEqual(len(cache), 0)
  41. def test_setitem_and_getitem(self):
  42. """Test setting and getting cache items"""
  43. cache = Cache(self.cache_file)
  44. # Test setting items
  45. cache["key1"] = "value1"
  46. cache["key2"] = "value2"
  47. # Test getting items
  48. self.assertEqual(cache["key1"], "value1")
  49. self.assertEqual(cache["key2"], "value2")
  50. self.assertEqual(len(cache), 2)
  51. def test_setitem_duplicate_value(self):
  52. """Test setting the same value twice doesn't trigger update"""
  53. cache = Cache(self.cache_file)
  54. with patch.object(cache, "_Cache__update") as mock_update:
  55. cache["key1"] = "value1"
  56. mock_update.assert_called_once()
  57. # Setting the same value should not trigger update
  58. mock_update.reset_mock()
  59. cache["key1"] = "value1"
  60. mock_update.assert_not_called()
  61. def test_delitem(self):
  62. """Test deleting cache items"""
  63. cache = Cache(self.cache_file)
  64. cache["key1"] = "value1"
  65. cache["key2"] = "value2"
  66. # Delete an item
  67. del cache["key1"]
  68. self.assertEqual(len(cache), 1)
  69. self.assertNotIn("key1", cache)
  70. self.assertIn("key2", cache)
  71. def test_delitem_nonexistent_key(self):
  72. """Test deleting non-existent key doesn't raise error (silent handling)"""
  73. cache = Cache(self.cache_file)
  74. cache["key1"] = "value1"
  75. # Should not raise any exception
  76. del cache["nonexistent"]
  77. # Original data should remain unchanged
  78. self.assertEqual(len(cache), 1)
  79. self.assertIn("key1", cache)
  80. def test_delitem_idempotent(self):
  81. """Test that multiple deletions of the same key are safe"""
  82. cache = Cache(self.cache_file)
  83. cache["key1"] = "value1"
  84. # First deletion should work
  85. del cache["key1"]
  86. self.assertEqual(len(cache), 0)
  87. self.assertNotIn("key1", cache)
  88. # Second deletion should be safe (no error)
  89. del cache["key1"]
  90. self.assertEqual(len(cache), 0)
  91. # Third deletion should also be safe
  92. del cache["key1"]
  93. self.assertEqual(len(cache), 0)
  94. def test_contains(self):
  95. """Test membership testing"""
  96. cache = Cache(self.cache_file)
  97. cache["key1"] = "value1"
  98. self.assertIn("key1", cache)
  99. self.assertNotIn("key2", cache)
  100. def test_clear(self):
  101. """Test clearing cache"""
  102. cache = Cache(self.cache_file)
  103. cache["key1"] = "value1"
  104. cache["key2"] = "value2"
  105. cache.clear()
  106. self.assertEqual(len(cache), 0)
  107. self.assertNotIn("key1", cache)
  108. self.assertNotIn("key2", cache)
  109. def test_clear_empty_cache(self):
  110. """Test clearing empty cache doesn't trigger update"""
  111. cache = Cache(self.cache_file)
  112. with patch.object(cache, "_Cache__update") as mock_update:
  113. cache.clear()
  114. mock_update.assert_not_called()
  115. def test_clear_preserves_private_fields(self):
  116. """Test that clear only removes non-private fields"""
  117. cache = Cache(self.cache_file)
  118. cache["normal1"] = "value1"
  119. cache["normal2"] = "value2"
  120. cache["__private"] = "private_value"
  121. # Check initial state
  122. self.assertEqual(len(cache), 2) # Only counts non-private fields
  123. # Clear should only remove non-private fields
  124. cache.clear()
  125. # Private field should still exist in underlying dict
  126. self.assertEqual(len(cache), 0)
  127. self.assertNotIn("normal1", cache)
  128. self.assertNotIn("normal2", cache)
  129. # Private field still exists but not counted/visible
  130. self.assertTrue("__private" in dict(cache))
  131. def test_iteration(self):
  132. """Test iterating over cache keys"""
  133. cache = Cache(self.cache_file)
  134. cache["key1"] = "value1"
  135. cache["key2"] = "value2"
  136. keys = list(cache)
  137. self.assertEqual(set(keys), {"key1", "key2"})
  138. def test_private_fields_excluded(self):
  139. """Test that private fields (starting with __) are excluded from operations"""
  140. cache = Cache(self.cache_file)
  141. cache["normal_key"] = "normal_value"
  142. # Manually add a private field to the underlying dict (for testing purposes)
  143. super(Cache, cache).__setitem__("__private_field", "private_value")
  144. # len() should exclude private fields
  145. self.assertEqual(len(cache), 1)
  146. # iteration should exclude private fields
  147. keys = list(cache)
  148. self.assertEqual(keys, ["normal_key"])
  149. # data() should exclude private fields
  150. data = cache.get(None)
  151. self.assertEqual(data, {"normal_key": "normal_value"})
  152. def test_private_field_operations_no_sync(self):
  153. """Test that private field operations don't trigger sync"""
  154. cache = Cache(self.cache_file)
  155. with patch.object(cache, "_Cache__update") as mock_update:
  156. # Setting private field should not trigger sync
  157. cache["__private"] = "private_value"
  158. mock_update.assert_not_called()
  159. # Modifying private field should not trigger sync
  160. cache["__private"] = "new_private_value"
  161. mock_update.assert_not_called()
  162. # Deleting private field should not trigger sync
  163. del cache["__private"]
  164. mock_update.assert_not_called()
  165. # Normal field operations should trigger sync
  166. cache["normal"] = "value"
  167. mock_update.assert_called_once()
  168. def test_private_fields_not_saved_to_file(self):
  169. """Test that private fields are not saved to file"""
  170. cache = Cache(self.cache_file)
  171. cache["normal_key"] = "normal_value"
  172. cache["__private_key"] = "private_value"
  173. # Sync to file
  174. cache.sync()
  175. # Load new cache instance
  176. cache2 = Cache(self.cache_file)
  177. # Only normal fields should be loaded
  178. self.assertEqual(len(cache2), 1)
  179. self.assertIn("normal_key", cache2)
  180. self.assertNotIn("__private_key", cache2)
  181. self.assertEqual(cache2["normal_key"], "normal_value")
  182. def test_data_method(self):
  183. """Test data method for getting cache contents"""
  184. cache = Cache(self.cache_file)
  185. cache["key1"] = "value1"
  186. cache["key2"] = "value2"
  187. # Test getting all data
  188. # data = cache.get()
  189. # self.assertEqual(data, {"key1": "value1", "key2": "value2"})
  190. # Test getting specific key
  191. self.assertEqual(cache.get("key1"), "value1")
  192. self.assertEqual(cache.get("nonexistent", "default"), "default")
  193. def test_data_method_with_sync(self):
  194. """Test data method with sync enabled calls load"""
  195. cache = Cache(self.cache_file, sync=True)
  196. with patch.object(cache, "load") as mock_load:
  197. cache.load()
  198. mock_load.assert_called_once()
  199. def test_sync_method(self):
  200. """Test sync method saves data to file"""
  201. cache = Cache(self.cache_file)
  202. cache["key1"] = "value1"
  203. cache["key2"] = "value2"
  204. # Sync should save to file
  205. result = cache.sync()
  206. self.assertIs(result, cache) # Should return self
  207. self.assertTrue(os.path.exists(self.cache_file))
  208. # Load another cache instance to verify data was saved
  209. cache2 = Cache(self.cache_file)
  210. self.assertEqual(len(cache2), 2)
  211. self.assertEqual(cache2["key1"], "value1")
  212. self.assertEqual(cache2["key2"], "value2")
  213. def test_sync_no_changes(self):
  214. """Test sync when no changes have been made after load"""
  215. # Create and save initial cache
  216. cache = Cache(self.cache_file)
  217. cache["key1"] = "value1"
  218. cache.sync() # This clears the __changed flag
  219. with patch("ddns.cache.dump") as mock_dump:
  220. cache.sync() # This should not call dump since no changes
  221. mock_dump.assert_not_called()
  222. def test_load_existing_file(self):
  223. """Test loading from existing cache file"""
  224. # Create initial cache and save data
  225. cache1 = Cache(self.cache_file)
  226. cache1["key1"] = "value1"
  227. cache1["key2"] = 2
  228. cache1.sync()
  229. # Load new cache instance
  230. cache2 = Cache(self.cache_file)
  231. self.assertEqual(len(cache2), 2)
  232. self.assertEqual(cache2["key1"], "value1")
  233. self.assertEqual(cache2["key2"], 2)
  234. def test_load_corrupted_file(self):
  235. """Test loading from corrupted cache file"""
  236. # Create a corrupted cache file
  237. with open(self.cache_file, "w") as f:
  238. f.write("corrupted data")
  239. # Should handle corruption gracefully
  240. cache = Cache(self.cache_file)
  241. self.assertEqual(len(cache), 0)
  242. def test_load_with_exception(self):
  243. """Test load method handles exceptions properly"""
  244. # Create a file first
  245. with open(self.cache_file, "wb") as f:
  246. f.write(b"invalid pickle data")
  247. cache = Cache(self.cache_file)
  248. with patch("ddns.cache.load", side_effect=Exception("Test error")):
  249. with patch.object(cache, "_Cache__logger") as mock_logger:
  250. cache.load()
  251. mock_logger.warning.assert_called_once()
  252. def test_time_property(self):
  253. """Test time property returns modification time"""
  254. cache = Cache(self.cache_file)
  255. initial_time = cache.time # type: float # type: ignore[assignment]
  256. self.assertIsInstance(initial_time, float)
  257. self.assertGreater(initial_time, 0)
  258. def test_close_method(self):
  259. """Test close method syncs and cleans up"""
  260. cache = Cache(self.cache_file)
  261. cache["key1"] = "value1"
  262. with patch.object(cache, "sync") as mock_sync:
  263. cache.close()
  264. mock_sync.assert_called_once()
  265. def test_str_representation(self):
  266. """Test string representation of cache"""
  267. cache = Cache(self.cache_file)
  268. cache["key1"] = "value1"
  269. str_repr = str(cache)
  270. self.assertIn("key1", str_repr)
  271. self.assertIn("value1", str_repr)
  272. def test_auto_sync_behavior(self):
  273. """Test auto-sync behavior when sync=True"""
  274. cache = Cache(self.cache_file, sync=True)
  275. with patch.object(cache, "sync") as mock_sync:
  276. cache["key1"] = "value1"
  277. mock_sync.assert_called()
  278. def test_no_auto_sync_behavior(self):
  279. """Test no auto-sync behavior when sync=False (default)"""
  280. cache = Cache(self.cache_file, sync=False)
  281. with patch.object(cache, "sync") as mock_sync:
  282. cache["key1"] = "value1"
  283. mock_sync.assert_not_called()
  284. def test_context_manager_like_usage(self):
  285. """Test using cache in a context-manager-like pattern"""
  286. cache = Cache(self.cache_file)
  287. cache["key1"] = "value1"
  288. cache["key2"] = "value2"
  289. # Manually call close (simulating __del__)
  290. cache.close()
  291. # Verify data was persisted
  292. cache2 = Cache(self.cache_file)
  293. self.assertEqual(len(cache2), 2)
  294. self.assertEqual(cache2["key1"], "value1")
  295. def test_update_time_on_changes(self):
  296. """Test that modification time is updated on changes"""
  297. cache = Cache(self.cache_file)
  298. initial_time = cache.time
  299. # Small delay to ensure time difference
  300. sleep(0.01)
  301. cache["key1"] = "value1"
  302. new_time = cache.time # type: float # type: ignore[assignment]
  303. self.assertGreater(new_time, initial_time) # type: ignore[comparison-overlap]
  304. def test_integration_multiple_operations(self):
  305. """Integration test with multiple operations"""
  306. cache = Cache(self.cache_file)
  307. # Add some data
  308. cache["user1"] = {"name": "Alice", "age": 30}
  309. cache["user2"] = {"name": "Bob", "age": 25}
  310. cache["config"] = {"debug": True, "timeout": 30}
  311. self.assertEqual(len(cache), 3)
  312. # Modify data
  313. cache["user1"]["age"] = 31 # This won't trigger update automatically
  314. cache["user1"] = {"name": "Alice", "age": 31} # This will
  315. # Delete data
  316. del cache["user2"]
  317. self.assertEqual(len(cache), 2)
  318. self.assertEqual(cache["user1"]["age"], 31)
  319. self.assertNotIn("user2", cache)
  320. # Persist and reload
  321. cache.sync()
  322. cache2 = Cache(self.cache_file)
  323. self.assertEqual(len(cache2), 2)
  324. self.assertEqual(cache2["user1"]["age"], 31)
  325. self.assertEqual(cache2["config"]["debug"], True)
  326. def test_mixed_public_private_operations(self):
  327. """Test mixed operations with public and private fields"""
  328. cache = Cache(self.cache_file)
  329. # Add mixed data
  330. cache["public1"] = "public_value1"
  331. cache["__private1"] = "private_value1"
  332. cache["public2"] = "public_value2"
  333. cache["__private2"] = "private_value2"
  334. # Only public fields should be counted
  335. self.assertEqual(len(cache), 2)
  336. # Only public fields should be iterable
  337. public_keys = list(cache)
  338. self.assertEqual(set(public_keys), {"public1", "public2"})
  339. # data() should only return public fields
  340. data = cache.get(None)
  341. self.assertEqual(data, {"public1": "public_value1", "public2": "public_value2"})
  342. # Delete operations
  343. del cache["public1"] # Should work
  344. del cache["__private1"] # Should work but not trigger sync
  345. del cache["nonexistent"] # Should be safe
  346. # Only one public field should remain
  347. self.assertEqual(len(cache), 1)
  348. self.assertEqual(list(cache), ["public2"])
  349. # Sync and reload
  350. cache.sync()
  351. cache2 = Cache(self.cache_file)
  352. # Only public field should be persisted
  353. self.assertEqual(len(cache2), 1)
  354. self.assertEqual(list(cache2), ["public2"])
  355. self.assertEqual(cache2["public2"], "public_value2")
  356. def test_str_representation_excludes_private(self):
  357. """Test that string representation only shows public fields"""
  358. cache = Cache(self.cache_file)
  359. cache["public"] = "public_value"
  360. cache["__private"] = "private_value"
  361. str_repr = str(cache)
  362. self.assertIn("public", str_repr)
  363. self.assertIn("public_value", str_repr)
  364. # Note: private fields might still appear in str() since it calls super().__str__()
  365. # This is acceptable as str() shows the raw dict content
  366. def test_json_format_verification(self):
  367. """Test that cache files are saved in JSON format"""
  368. import json
  369. cache = Cache(self.cache_file)
  370. cache["string_key"] = "string_value"
  371. cache["number_key"] = 42
  372. cache["list_key"] = [1, 2, 3]
  373. cache["dict_key"] = {"nested": "value"}
  374. # Save to file
  375. cache.sync()
  376. # Verify file exists and is valid JSON
  377. self.assertTrue(os.path.exists(self.cache_file))
  378. # Read the file and verify it's valid JSON
  379. with open(self.cache_file, "r") as f:
  380. file_content = f.read()
  381. parsed_json = json.loads(file_content)
  382. # Verify the content matches what we saved
  383. self.assertEqual(parsed_json["string_key"], "string_value")
  384. self.assertEqual(parsed_json["number_key"], 42)
  385. self.assertEqual(parsed_json["list_key"], [1, 2, 3])
  386. self.assertEqual(parsed_json["dict_key"], {"nested": "value"})
  387. # Verify the file contains readable JSON (not binary pickle data)
  388. self.assertIn('"string_key"', file_content)
  389. self.assertIn('"string_value"', file_content)
  390. def test_cache_new_disabled(self):
  391. """Test Cache.new with cache disabled (config_cache=False)"""
  392. import logging
  393. logger = logging.getLogger("test_logger")
  394. cache = Cache.new(False, "test_hash", logger)
  395. # Should return None when cache is disabled
  396. self.assertIsNone(cache)
  397. def test_cache_new_temp_file(self):
  398. """Test Cache.new with temp file creation (config_cache=True)"""
  399. import logging
  400. import tempfile
  401. logger = logging.getLogger("test_logger")
  402. test_hash = "test_hash_123"
  403. cache = Cache.new(True, test_hash, logger)
  404. # Should create a cache instance
  405. self.assertIsNotNone(cache)
  406. self.assertIsInstance(cache, Cache)
  407. # Should use temp directory with correct naming pattern
  408. expected_pattern = "ddns.%s.cache" % test_hash
  409. self.assertIn(expected_pattern, cache._Cache__filename)
  410. self.assertIn(tempfile.gettempdir(), cache._Cache__filename)
  411. # Clean up
  412. cache.close()
  413. def test_cache_new_custom_path(self):
  414. """Test Cache.new with custom cache file path"""
  415. import logging
  416. logger = logging.getLogger("test_logger")
  417. custom_path = self.cache_file
  418. cache = Cache.new(custom_path, "unused_hash", logger)
  419. # Should create a cache instance with custom path
  420. self.assertIsNotNone(cache)
  421. self.assertIsInstance(cache, Cache)
  422. self.assertEqual(cache._Cache__filename, custom_path)
  423. # Clean up
  424. cache.close()
  425. @patch("ddns.cache.time")
  426. def test_cache_new_outdated_cache(self, mock_time):
  427. """Test Cache.new with outdated cache file (>72 hours old)"""
  428. import logging
  429. import json
  430. logger = logging.getLogger("test_logger")
  431. # Create a cache file with some data
  432. test_data = {"test_key": "test_value"}
  433. with open(self.cache_file, "w") as f:
  434. json.dump(test_data, f)
  435. # Mock current time to make cache appear outdated
  436. # Cache file mtime will be recent, but we'll mock current time to be 73 hours later
  437. current_time = 1000000
  438. mock_time.return_value = current_time
  439. # Mock the file modification time to be 73 hours ago
  440. old_mtime = current_time - (73 * 3600) # 73 hours ago
  441. with patch("ddns.cache.stat") as mock_stat:
  442. mock_stat.return_value.st_mtime = old_mtime
  443. cache = Cache.new(self.cache_file, "test_hash", logger)
  444. # Should create cache instance but clear outdated data
  445. self.assertIsNotNone(cache)
  446. self.assertIsInstance(cache, Cache)
  447. self.assertEqual(len(cache), 0) # Should be empty due to age
  448. # Clean up
  449. cache.close()
  450. def test_cache_new_empty_cache(self):
  451. """Test Cache.new with empty cache file"""
  452. import logging
  453. import json
  454. logger = logging.getLogger("test_logger")
  455. # Create an empty cache file
  456. with open(self.cache_file, "w") as f:
  457. json.dump({}, f)
  458. cache = Cache.new(self.cache_file, "test_hash", logger)
  459. # Should create cache instance
  460. self.assertIsNotNone(cache)
  461. self.assertIsInstance(cache, Cache)
  462. self.assertEqual(len(cache), 0)
  463. # Clean up
  464. cache.close()
  465. @patch("ddns.cache.time")
  466. def test_cache_new_valid_cache(self, mock_time):
  467. """Test Cache.new with valid cache file with data"""
  468. import logging
  469. import json
  470. logger = logging.getLogger("test_logger")
  471. # Create a cache file with test data
  472. test_data = {
  473. "domain1.com": {"ip": "1.2.3.4", "timestamp": 1234567890},
  474. "domain2.com": {"ip": "5.6.7.8", "timestamp": 1234567891},
  475. }
  476. with open(self.cache_file, "w") as f:
  477. json.dump(test_data, f)
  478. # Mock current time to be within 72 hours of cache creation
  479. current_time = 1000000
  480. mock_time.return_value = current_time
  481. # Mock file modification time to be recent (within 72 hours)
  482. recent_mtime = current_time - (24 * 3600) # 24 hours ago
  483. with patch("ddns.cache.stat") as mock_stat:
  484. mock_stat.return_value.st_mtime = recent_mtime
  485. cache = Cache.new(self.cache_file, "test_hash", logger)
  486. # Should create cache instance with loaded data
  487. self.assertIsNotNone(cache)
  488. self.assertIsInstance(cache, Cache)
  489. self.assertEqual(len(cache), 2)
  490. self.assertEqual(cache["domain1.com"]["ip"], "1.2.3.4")
  491. self.assertEqual(cache["domain2.com"]["ip"], "5.6.7.8")
  492. # Clean up
  493. cache.close()
  494. def test_cache_new_nonexistent_file(self):
  495. """Test Cache.new with nonexistent cache file"""
  496. import logging
  497. logger = logging.getLogger("test_logger")
  498. nonexistent_path = tempfile.mktemp(prefix="ddns_test_nonexistent_", suffix=".cache")
  499. # Ensure file doesn't exist
  500. if os.path.exists(nonexistent_path):
  501. os.remove(nonexistent_path)
  502. cache = Cache.new(nonexistent_path, "test_hash", logger)
  503. # Should create cache instance with empty data
  504. self.assertIsNotNone(cache)
  505. self.assertIsInstance(cache, Cache)
  506. self.assertEqual(len(cache), 0)
  507. # Clean up
  508. cache.close()
  509. if __name__ == "__main__":
  510. unittest.main()