test_cache.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671
  1. # -*- coding: utf-8 -*-
  2. """
  3. Test cases for cache module
  4. @author: GitHub Copilot
  5. """
  6. from __init__ import patch, unittest
  7. import os
  8. import tempfile
  9. from time import sleep
  10. from ddns.cache import Cache # noqa: E402
  11. class TestCache(unittest.TestCase):
  12. """Test cases for Cache class"""
  13. def setUp(self):
  14. """Set up test fixtures"""
  15. # Create a temporary directory for test cache files
  16. self.cache_file = tempfile.mktemp(prefix="ddns_test_cache_", suffix="pk1")
  17. # self.cache_file = os.path.join(self.test_dir, "test_cache.%d.pkl" % random.randint(1, 10000))
  18. def tearDown(self):
  19. """Clean up test fixtures"""
  20. # Remove the temporary directory and all its contents
  21. if os.path.exists(self.cache_file):
  22. os.remove(self.cache_file)
  23. def test_init_new_cache(self):
  24. """Test cache initialization with new cache file"""
  25. cache = Cache(self.cache_file)
  26. # Verify initialization
  27. self.assertEqual(len(cache), 0)
  28. self.assertIsInstance(cache.time, float)
  29. self.assertFalse(os.path.exists(self.cache_file)) # File not created until sync
  30. def test_init_with_logger(self):
  31. """Test cache initialization with custom logger"""
  32. import logging
  33. logger = logging.getLogger("test_logger")
  34. cache = Cache(self.cache_file, logger=logger)
  35. self.assertEqual(len(cache), 0)
  36. def test_init_with_sync(self):
  37. """Test cache initialization with sync enabled"""
  38. cache = Cache(self.cache_file, sync=True)
  39. self.assertEqual(len(cache), 0)
  40. def test_setitem_and_getitem(self):
  41. """Test setting and getting cache items"""
  42. cache = Cache(self.cache_file)
  43. # Test setting items
  44. cache["key1"] = "value1"
  45. cache["key2"] = "value2"
  46. # Test getting items
  47. self.assertEqual(cache["key1"], "value1")
  48. self.assertEqual(cache["key2"], "value2")
  49. self.assertEqual(len(cache), 2)
  50. def test_setitem_duplicate_value(self):
  51. """Test setting the same value twice doesn't trigger update"""
  52. cache = Cache(self.cache_file)
  53. with patch.object(cache, "_Cache__update") as mock_update:
  54. cache["key1"] = "value1"
  55. mock_update.assert_called_once()
  56. # Setting the same value should not trigger update
  57. mock_update.reset_mock()
  58. cache["key1"] = "value1"
  59. mock_update.assert_not_called()
  60. def test_delitem(self):
  61. """Test deleting cache items"""
  62. cache = Cache(self.cache_file)
  63. cache["key1"] = "value1"
  64. cache["key2"] = "value2"
  65. # Delete an item
  66. del cache["key1"]
  67. self.assertEqual(len(cache), 1)
  68. self.assertNotIn("key1", cache)
  69. self.assertIn("key2", cache)
  70. def test_delitem_nonexistent_key(self):
  71. """Test deleting non-existent key doesn't raise error (silent handling)"""
  72. cache = Cache(self.cache_file)
  73. cache["key1"] = "value1"
  74. # Should not raise any exception
  75. del cache["nonexistent"]
  76. # Original data should remain unchanged
  77. self.assertEqual(len(cache), 1)
  78. self.assertIn("key1", cache)
  79. def test_delitem_idempotent(self):
  80. """Test that multiple deletions of the same key are safe"""
  81. cache = Cache(self.cache_file)
  82. cache["key1"] = "value1"
  83. # First deletion should work
  84. del cache["key1"]
  85. self.assertEqual(len(cache), 0)
  86. self.assertNotIn("key1", cache)
  87. # Second deletion should be safe (no error)
  88. del cache["key1"]
  89. self.assertEqual(len(cache), 0)
  90. # Third deletion should also be safe
  91. del cache["key1"]
  92. self.assertEqual(len(cache), 0)
  93. def test_contains(self):
  94. """Test membership testing"""
  95. cache = Cache(self.cache_file)
  96. cache["key1"] = "value1"
  97. self.assertIn("key1", cache)
  98. self.assertNotIn("key2", cache)
  99. def test_clear(self):
  100. """Test clearing cache"""
  101. cache = Cache(self.cache_file)
  102. cache["key1"] = "value1"
  103. cache["key2"] = "value2"
  104. cache.clear()
  105. self.assertEqual(len(cache), 0)
  106. self.assertNotIn("key1", cache)
  107. self.assertNotIn("key2", cache)
  108. def test_clear_empty_cache(self):
  109. """Test clearing empty cache doesn't trigger update"""
  110. cache = Cache(self.cache_file)
  111. with patch.object(cache, "_Cache__update") as mock_update:
  112. cache.clear()
  113. mock_update.assert_not_called()
  114. def test_clear_preserves_private_fields(self):
  115. """Test that clear only removes non-private fields"""
  116. cache = Cache(self.cache_file)
  117. cache["normal1"] = "value1"
  118. cache["normal2"] = "value2"
  119. cache["__private"] = "private_value"
  120. # Check initial state
  121. self.assertEqual(len(cache), 2) # Only counts non-private fields
  122. # Clear should only remove non-private fields
  123. cache.clear()
  124. # Private field should still exist in underlying dict
  125. self.assertEqual(len(cache), 0)
  126. self.assertNotIn("normal1", cache)
  127. self.assertNotIn("normal2", cache)
  128. # Private field still exists but not counted/visible
  129. self.assertTrue("__private" in dict(cache))
  130. def test_iteration(self):
  131. """Test iterating over cache keys"""
  132. cache = Cache(self.cache_file)
  133. cache["key1"] = "value1"
  134. cache["key2"] = "value2"
  135. keys = list(cache)
  136. self.assertEqual(set(keys), {"key1", "key2"})
  137. def test_private_fields_excluded(self):
  138. """Test that private fields (starting with __) are excluded from operations"""
  139. cache = Cache(self.cache_file)
  140. cache["normal_key"] = "normal_value"
  141. # Manually add a private field to the underlying dict (for testing purposes)
  142. super(Cache, cache).__setitem__("__private_field", "private_value")
  143. # len() should exclude private fields
  144. self.assertEqual(len(cache), 1)
  145. # iteration should exclude private fields
  146. keys = list(cache)
  147. self.assertEqual(keys, ["normal_key"])
  148. # data() should exclude private fields
  149. data = cache.get(None)
  150. self.assertEqual(data, {"normal_key": "normal_value"})
  151. def test_private_field_operations_no_sync(self):
  152. """Test that private field operations don't trigger sync"""
  153. cache = Cache(self.cache_file)
  154. with patch.object(cache, "_Cache__update") as mock_update:
  155. # Setting private field should not trigger sync
  156. cache["__private"] = "private_value"
  157. mock_update.assert_not_called()
  158. # Modifying private field should not trigger sync
  159. cache["__private"] = "new_private_value"
  160. mock_update.assert_not_called()
  161. # Deleting private field should not trigger sync
  162. del cache["__private"]
  163. mock_update.assert_not_called()
  164. # Normal field operations should trigger sync
  165. cache["normal"] = "value"
  166. mock_update.assert_called_once()
  167. def test_private_fields_not_saved_to_file(self):
  168. """Test that private fields are not saved to file"""
  169. cache = Cache(self.cache_file)
  170. cache["normal_key"] = "normal_value"
  171. cache["__private_key"] = "private_value"
  172. # Sync to file
  173. cache.sync()
  174. # Load new cache instance
  175. cache2 = Cache(self.cache_file)
  176. # Only normal fields should be loaded
  177. self.assertEqual(len(cache2), 1)
  178. self.assertIn("normal_key", cache2)
  179. self.assertNotIn("__private_key", cache2)
  180. self.assertEqual(cache2["normal_key"], "normal_value")
  181. def test_data_method(self):
  182. """Test data method for getting cache contents"""
  183. cache = Cache(self.cache_file)
  184. cache["key1"] = "value1"
  185. cache["key2"] = "value2"
  186. # Test getting all data
  187. # data = cache.get()
  188. # self.assertEqual(data, {"key1": "value1", "key2": "value2"})
  189. # Test getting specific key
  190. self.assertEqual(cache.get("key1"), "value1")
  191. self.assertEqual(cache.get("nonexistent", "default"), "default")
  192. def test_data_method_with_sync(self):
  193. """Test data method with sync enabled calls load"""
  194. cache = Cache(self.cache_file, sync=True)
  195. with patch.object(cache, "load") as mock_load:
  196. cache.load()
  197. mock_load.assert_called_once()
  198. def test_sync_method(self):
  199. """Test sync method saves data to file"""
  200. cache = Cache(self.cache_file)
  201. cache["key1"] = "value1"
  202. cache["key2"] = "value2"
  203. # Sync should save to file
  204. result = cache.sync()
  205. self.assertIs(result, cache) # Should return self
  206. self.assertTrue(os.path.exists(self.cache_file))
  207. # Load another cache instance to verify data was saved
  208. cache2 = Cache(self.cache_file)
  209. self.assertEqual(len(cache2), 2)
  210. self.assertEqual(cache2["key1"], "value1")
  211. self.assertEqual(cache2["key2"], "value2")
  212. def test_sync_no_changes(self):
  213. """Test sync when no changes have been made after load"""
  214. # Create and save initial cache
  215. cache = Cache(self.cache_file)
  216. cache["key1"] = "value1"
  217. cache.sync() # This clears the __changed flag
  218. with patch("ddns.cache.dump") as mock_dump:
  219. cache.sync() # This should not call dump since no changes
  220. mock_dump.assert_not_called()
  221. def test_load_existing_file(self):
  222. """Test loading from existing cache file"""
  223. # Create initial cache and save data
  224. cache1 = Cache(self.cache_file)
  225. cache1["key1"] = "value1"
  226. cache1["key2"] = 2
  227. cache1.sync()
  228. # Load new cache instance
  229. cache2 = Cache(self.cache_file)
  230. self.assertEqual(len(cache2), 2)
  231. self.assertEqual(cache2["key1"], "value1")
  232. self.assertEqual(cache2["key2"], 2)
  233. def test_load_corrupted_file(self):
  234. """Test loading from corrupted cache file"""
  235. # Create a corrupted cache file
  236. with open(self.cache_file, "w") as f:
  237. f.write("corrupted data")
  238. # Should handle corruption gracefully
  239. cache = Cache(self.cache_file)
  240. self.assertEqual(len(cache), 0)
  241. def test_load_with_exception(self):
  242. """Test load method handles exceptions properly"""
  243. # Create a file first
  244. with open(self.cache_file, "wb") as f:
  245. f.write(b"invalid pickle data")
  246. cache = Cache(self.cache_file)
  247. with patch("ddns.cache.load", side_effect=Exception("Test error")):
  248. with patch.object(cache, "_Cache__logger") as mock_logger:
  249. cache.load()
  250. mock_logger.warning.assert_called_once()
  251. def test_time_property(self):
  252. """Test time property returns modification time"""
  253. cache = Cache(self.cache_file)
  254. initial_time = cache.time # type: float # type: ignore[assignment]
  255. self.assertIsInstance(initial_time, float)
  256. self.assertGreater(initial_time, 0)
  257. def test_close_method(self):
  258. """Test close method syncs and cleans up"""
  259. cache = Cache(self.cache_file)
  260. cache["key1"] = "value1"
  261. with patch.object(cache, "sync") as mock_sync:
  262. cache.close()
  263. mock_sync.assert_called_once()
  264. def test_str_representation(self):
  265. """Test string representation of cache"""
  266. cache = Cache(self.cache_file)
  267. cache["key1"] = "value1"
  268. str_repr = str(cache)
  269. self.assertIn("key1", str_repr)
  270. self.assertIn("value1", str_repr)
  271. def test_auto_sync_behavior(self):
  272. """Test auto-sync behavior when sync=True"""
  273. cache = Cache(self.cache_file, sync=True)
  274. with patch.object(cache, "sync") as mock_sync:
  275. cache["key1"] = "value1"
  276. mock_sync.assert_called()
  277. def test_no_auto_sync_behavior(self):
  278. """Test no auto-sync behavior when sync=False (default)"""
  279. cache = Cache(self.cache_file, sync=False)
  280. with patch.object(cache, "sync") as mock_sync:
  281. cache["key1"] = "value1"
  282. mock_sync.assert_not_called()
  283. def test_context_manager_like_usage(self):
  284. """Test using cache in a context-manager-like pattern"""
  285. cache = Cache(self.cache_file)
  286. cache["key1"] = "value1"
  287. cache["key2"] = "value2"
  288. # Manually call close (simulating __del__)
  289. cache.close()
  290. # Verify data was persisted
  291. cache2 = Cache(self.cache_file)
  292. self.assertEqual(len(cache2), 2)
  293. self.assertEqual(cache2["key1"], "value1")
  294. def test_update_time_on_changes(self):
  295. """Test that modification time is updated on changes"""
  296. cache = Cache(self.cache_file)
  297. initial_time = cache.time
  298. # Small delay to ensure time difference
  299. sleep(0.01)
  300. cache["key1"] = "value1"
  301. new_time = cache.time # type: float # type: ignore[assignment]
  302. self.assertGreater(new_time, initial_time) # type: ignore[comparison-overlap]
  303. def test_integration_multiple_operations(self):
  304. """Integration test with multiple operations"""
  305. cache = Cache(self.cache_file)
  306. # Add some data
  307. cache["user1"] = {"name": "Alice", "age": 30}
  308. cache["user2"] = {"name": "Bob", "age": 25}
  309. cache["config"] = {"debug": True, "timeout": 30}
  310. self.assertEqual(len(cache), 3)
  311. # Modify data
  312. cache["user1"]["age"] = 31 # This won't trigger update automatically
  313. cache["user1"] = {"name": "Alice", "age": 31} # This will
  314. # Delete data
  315. del cache["user2"]
  316. self.assertEqual(len(cache), 2)
  317. self.assertEqual(cache["user1"]["age"], 31)
  318. self.assertNotIn("user2", cache)
  319. # Persist and reload
  320. cache.sync()
  321. cache2 = Cache(self.cache_file)
  322. self.assertEqual(len(cache2), 2)
  323. self.assertEqual(cache2["user1"]["age"], 31)
  324. self.assertEqual(cache2["config"]["debug"], True)
  325. def test_mixed_public_private_operations(self):
  326. """Test mixed operations with public and private fields"""
  327. cache = Cache(self.cache_file)
  328. # Add mixed data
  329. cache["public1"] = "public_value1"
  330. cache["__private1"] = "private_value1"
  331. cache["public2"] = "public_value2"
  332. cache["__private2"] = "private_value2"
  333. # Only public fields should be counted
  334. self.assertEqual(len(cache), 2)
  335. # Only public fields should be iterable
  336. public_keys = list(cache)
  337. self.assertEqual(set(public_keys), {"public1", "public2"})
  338. # data() should only return public fields
  339. data = cache.get(None)
  340. self.assertEqual(data, {"public1": "public_value1", "public2": "public_value2"})
  341. # Delete operations
  342. del cache["public1"] # Should work
  343. del cache["__private1"] # Should work but not trigger sync
  344. del cache["nonexistent"] # Should be safe
  345. # Only one public field should remain
  346. self.assertEqual(len(cache), 1)
  347. self.assertEqual(list(cache), ["public2"])
  348. # Sync and reload
  349. cache.sync()
  350. cache2 = Cache(self.cache_file)
  351. # Only public field should be persisted
  352. self.assertEqual(len(cache2), 1)
  353. self.assertEqual(list(cache2), ["public2"])
  354. self.assertEqual(cache2["public2"], "public_value2")
  355. def test_str_representation_excludes_private(self):
  356. """Test that string representation only shows public fields"""
  357. cache = Cache(self.cache_file)
  358. cache["public"] = "public_value"
  359. cache["__private"] = "private_value"
  360. str_repr = str(cache)
  361. self.assertIn("public", str_repr)
  362. self.assertIn("public_value", str_repr)
  363. # Note: private fields might still appear in str() since it calls super().__str__()
  364. # This is acceptable as str() shows the raw dict content
  365. def test_json_format_verification(self):
  366. """Test that cache files are saved in JSON format"""
  367. import json
  368. cache = Cache(self.cache_file)
  369. cache["string_key"] = "string_value"
  370. cache["number_key"] = 42
  371. cache["list_key"] = [1, 2, 3]
  372. cache["dict_key"] = {"nested": "value"}
  373. # Save to file
  374. cache.sync()
  375. # Verify file exists and is valid JSON
  376. self.assertTrue(os.path.exists(self.cache_file))
  377. # Read the file and verify it's valid JSON
  378. with open(self.cache_file, "r") as f:
  379. file_content = f.read()
  380. parsed_json = json.loads(file_content)
  381. # Verify the content matches what we saved
  382. self.assertEqual(parsed_json["string_key"], "string_value")
  383. self.assertEqual(parsed_json["number_key"], 42)
  384. self.assertEqual(parsed_json["list_key"], [1, 2, 3])
  385. self.assertEqual(parsed_json["dict_key"], {"nested": "value"})
  386. # Verify the file contains readable JSON (not binary pickle data)
  387. self.assertIn('"string_key"', file_content)
  388. self.assertIn('"string_value"', file_content)
  389. def test_cache_new_disabled(self):
  390. """Test Cache.new with cache disabled (config_cache=False)"""
  391. import logging
  392. logger = logging.getLogger("test_logger")
  393. cache = Cache.new(False, "test_hash", logger)
  394. # Should return None when cache is disabled
  395. self.assertIsNone(cache)
  396. def test_cache_new_temp_file(self):
  397. """Test Cache.new with temp file creation (config_cache=True)"""
  398. import logging
  399. import tempfile
  400. logger = logging.getLogger("test_logger")
  401. test_hash = "test_hash_123"
  402. cache = Cache.new(True, test_hash, logger)
  403. # Should create a cache instance
  404. self.assertIsNotNone(cache)
  405. self.assertIsInstance(cache, Cache)
  406. # Should use temp directory with correct naming pattern
  407. expected_pattern = "ddns.%s.cache" % test_hash
  408. self.assertIn(expected_pattern, cache._Cache__filename)
  409. self.assertIn(tempfile.gettempdir(), cache._Cache__filename)
  410. # Clean up
  411. cache.close()
  412. def test_cache_new_custom_path(self):
  413. """Test Cache.new with custom cache file path"""
  414. import logging
  415. logger = logging.getLogger("test_logger")
  416. custom_path = self.cache_file
  417. cache = Cache.new(custom_path, "unused_hash", logger)
  418. # Should create a cache instance with custom path
  419. self.assertIsNotNone(cache)
  420. self.assertIsInstance(cache, Cache)
  421. self.assertEqual(cache._Cache__filename, custom_path)
  422. # Clean up
  423. cache.close()
  424. @patch('ddns.cache.time')
  425. def test_cache_new_outdated_cache(self, mock_time):
  426. """Test Cache.new with outdated cache file (>72 hours old)"""
  427. import logging
  428. import json
  429. logger = logging.getLogger("test_logger")
  430. # Create a cache file with some data
  431. test_data = {"test_key": "test_value"}
  432. with open(self.cache_file, "w") as f:
  433. json.dump(test_data, f)
  434. # Mock current time to make cache appear outdated
  435. # Cache file mtime will be recent, but we'll mock current time to be 73 hours later
  436. current_time = 1000000
  437. mock_time.return_value = current_time
  438. # Mock the file modification time to be 73 hours ago
  439. old_mtime = current_time - (73 * 3600) # 73 hours ago
  440. with patch('ddns.cache.stat') as mock_stat:
  441. mock_stat.return_value.st_mtime = old_mtime
  442. cache = Cache.new(self.cache_file, "test_hash", logger)
  443. # Should create cache instance but clear outdated data
  444. self.assertIsNotNone(cache)
  445. self.assertIsInstance(cache, Cache)
  446. self.assertEqual(len(cache), 0) # Should be empty due to age
  447. # Clean up
  448. cache.close()
  449. def test_cache_new_empty_cache(self):
  450. """Test Cache.new with empty cache file"""
  451. import logging
  452. import json
  453. logger = logging.getLogger("test_logger")
  454. # Create an empty cache file
  455. with open(self.cache_file, "w") as f:
  456. json.dump({}, f)
  457. cache = Cache.new(self.cache_file, "test_hash", logger)
  458. # Should create cache instance
  459. self.assertIsNotNone(cache)
  460. self.assertIsInstance(cache, Cache)
  461. self.assertEqual(len(cache), 0)
  462. # Clean up
  463. cache.close()
  464. @patch('ddns.cache.time')
  465. def test_cache_new_valid_cache(self, mock_time):
  466. """Test Cache.new with valid cache file with data"""
  467. import logging
  468. import json
  469. logger = logging.getLogger("test_logger")
  470. # Create a cache file with test data
  471. test_data = {
  472. "domain1.com": {"ip": "1.2.3.4", "timestamp": 1234567890},
  473. "domain2.com": {"ip": "5.6.7.8", "timestamp": 1234567891}
  474. }
  475. with open(self.cache_file, "w") as f:
  476. json.dump(test_data, f)
  477. # Mock current time to be within 72 hours of cache creation
  478. current_time = 1000000
  479. mock_time.return_value = current_time
  480. # Mock file modification time to be recent (within 72 hours)
  481. recent_mtime = current_time - (24 * 3600) # 24 hours ago
  482. with patch('ddns.cache.stat') as mock_stat:
  483. mock_stat.return_value.st_mtime = recent_mtime
  484. cache = Cache.new(self.cache_file, "test_hash", logger)
  485. # Should create cache instance with loaded data
  486. self.assertIsNotNone(cache)
  487. self.assertIsInstance(cache, Cache)
  488. self.assertEqual(len(cache), 2)
  489. self.assertEqual(cache["domain1.com"]["ip"], "1.2.3.4")
  490. self.assertEqual(cache["domain2.com"]["ip"], "5.6.7.8")
  491. # Clean up
  492. cache.close()
  493. def test_cache_new_nonexistent_file(self):
  494. """Test Cache.new with nonexistent cache file"""
  495. import logging
  496. logger = logging.getLogger("test_logger")
  497. nonexistent_path = tempfile.mktemp(prefix="ddns_test_nonexistent_", suffix=".cache")
  498. # Ensure file doesn't exist
  499. if os.path.exists(nonexistent_path):
  500. os.remove(nonexistent_path)
  501. cache = Cache.new(nonexistent_path, "test_hash", logger)
  502. # Should create cache instance with empty data
  503. self.assertIsNotNone(cache)
  504. self.assertIsInstance(cache, Cache)
  505. self.assertEqual(len(cache), 0)
  506. # Clean up
  507. cache.close()
  508. if __name__ == "__main__":
  509. unittest.main()