bulk_import.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. #!/usr/bin/env python3
  2. """Bulk import memories from text files or JSON backups."""
  3. import argparse
  4. import json
  5. import logging
  6. import sys
  7. from pathlib import Path
  8. import requests
  9. logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
  10. logger = logging.getLogger(__name__)
  11. def import_from_json(file_path: Path, base_url: str, api_key: str | None = None) -> int:
  12. """
  13. Import memories from JSON export file.
  14. Args:
  15. file_path: Path to JSON file
  16. base_url: Cognio server URL
  17. api_key: Optional API key for authentication
  18. Returns:
  19. Number of memories imported
  20. """
  21. with open(file_path, encoding="utf-8") as f:
  22. data = json.load(f)
  23. memories = data.get("memories", [])
  24. if not memories:
  25. logger.warning("No memories found in JSON file")
  26. return 0
  27. headers = {"X-API-Key": api_key} if api_key else {}
  28. imported = 0
  29. duplicates = 0
  30. for memory in memories:
  31. try:
  32. response = requests.post(
  33. f"{base_url}/memory/save",
  34. json={
  35. "text": memory["text"],
  36. "project": memory.get("project"),
  37. "tags": memory.get("tags", []),
  38. },
  39. headers=headers,
  40. timeout=30,
  41. )
  42. response.raise_for_status()
  43. result = response.json()
  44. if result.get("duplicate"):
  45. duplicates += 1
  46. logger.debug(f"Duplicate: {memory['text'][:50]}...")
  47. else:
  48. imported += 1
  49. logger.info(f"Imported: {result['id']}")
  50. except requests.exceptions.RequestException as e:
  51. logger.error(f"Failed to import memory: {e}")
  52. continue
  53. logger.info(f"Import complete: {imported} new, {duplicates} duplicates")
  54. return imported
  55. def import_from_text(
  56. file_path: Path, base_url: str, project: str | None = None, api_key: str | None = None
  57. ) -> int:
  58. """
  59. Import memories from plain text file.
  60. Each non-empty line becomes a separate memory.
  61. Lines starting with # are treated as comments and skipped.
  62. Args:
  63. file_path: Path to text file
  64. base_url: Cognio server URL
  65. project: Optional project name
  66. api_key: Optional API key for authentication
  67. Returns:
  68. Number of memories imported
  69. """
  70. with open(file_path, encoding="utf-8") as f:
  71. lines = f.readlines()
  72. headers = {"X-API-Key": api_key} if api_key else {}
  73. imported = 0
  74. duplicates = 0
  75. for line_num, line in enumerate(lines, 1):
  76. line = line.strip()
  77. # Skip empty lines and comments
  78. if not line or line.startswith("#"):
  79. continue
  80. try:
  81. response = requests.post(
  82. f"{base_url}/memory/save",
  83. json={"text": line, "project": project, "tags": ["imported"]},
  84. headers=headers,
  85. timeout=30,
  86. )
  87. response.raise_for_status()
  88. result = response.json()
  89. if result.get("duplicate"):
  90. duplicates += 1
  91. logger.debug(f"Line {line_num}: Duplicate")
  92. else:
  93. imported += 1
  94. logger.info(f"Line {line_num}: Imported {result['id']}")
  95. except requests.exceptions.RequestException as e:
  96. logger.error(f"Line {line_num}: Failed to import - {e}")
  97. continue
  98. logger.info(f"Import complete: {imported} new, {duplicates} duplicates")
  99. return imported
  100. def import_from_markdown(
  101. file_path: Path, base_url: str, project: str | None = None, api_key: str | None = None
  102. ) -> int:
  103. """
  104. Import memories from Markdown file.
  105. Extracts text from Markdown sections and imports each section as a memory.
  106. Sections are delimited by horizontal rules (---).
  107. Args:
  108. file_path: Path to Markdown file
  109. base_url: Cognio server URL
  110. project: Optional project name
  111. api_key: Optional API key for authentication
  112. Returns:
  113. Number of memories imported
  114. """
  115. with open(file_path, encoding="utf-8") as f:
  116. content = f.read()
  117. # Split by horizontal rules
  118. sections = [s.strip() for s in content.split("---") if s.strip()]
  119. headers = {"X-API-Key": api_key} if api_key else {}
  120. imported = 0
  121. duplicates = 0
  122. for idx, section in enumerate(sections, 1):
  123. # Skip title section
  124. if section.startswith("# Memory Export"):
  125. continue
  126. # Extract text (skip metadata lines)
  127. lines = section.split("\n")
  128. text_lines = [
  129. line
  130. for line in lines
  131. if line
  132. and not line.startswith("##")
  133. and not line.startswith("**Project**:")
  134. and not line.startswith("**Tags**:")
  135. and not line.startswith("**Created**:")
  136. ]
  137. if not text_lines:
  138. continue
  139. text = "\n".join(text_lines).strip()
  140. try:
  141. response = requests.post(
  142. f"{base_url}/memory/save",
  143. json={"text": text, "project": project, "tags": ["imported"]},
  144. headers=headers,
  145. timeout=30,
  146. )
  147. response.raise_for_status()
  148. result = response.json()
  149. if result.get("duplicate"):
  150. duplicates += 1
  151. logger.debug(f"Section {idx}: Duplicate")
  152. else:
  153. imported += 1
  154. logger.info(f"Section {idx}: Imported {result['id']}")
  155. except requests.exceptions.RequestException as e:
  156. logger.error(f"Section {idx}: Failed to import - {e}")
  157. continue
  158. logger.info(f"Import complete: {imported} new, {duplicates} duplicates")
  159. return imported
  160. def detect_format(file_path: Path, format_arg: str) -> str | None:
  161. """
  162. Detect file format from extension.
  163. Args:
  164. file_path: Path to file
  165. format_arg: Format argument from CLI
  166. Returns:
  167. Detected format or None if cannot detect
  168. """
  169. if format_arg != "auto":
  170. return format_arg
  171. suffix = file_path.suffix.lower()
  172. format_map = {
  173. ".json": "json",
  174. ".md": "markdown",
  175. ".txt": "text",
  176. ".text": "text",
  177. }
  178. return format_map.get(suffix)
  179. def check_server_health(url: str) -> bool:
  180. """
  181. Check if Cognio server is reachable.
  182. Args:
  183. url: Server URL
  184. Returns:
  185. True if server is healthy, False otherwise
  186. """
  187. try:
  188. response = requests.get(f"{url}/health", timeout=5)
  189. response.raise_for_status()
  190. logger.info(f"Connected to Cognio server at {url}")
  191. return True
  192. except requests.exceptions.RequestException as e:
  193. logger.error(f"Cannot connect to Cognio server: {e}")
  194. return False
  195. def perform_import(
  196. format_type: str, file_path: Path, url: str, project: str | None, api_key: str | None
  197. ) -> int:
  198. """
  199. Perform import based on format type.
  200. Args:
  201. format_type: Format type (json, text, markdown)
  202. file_path: Path to file
  203. url: Server URL
  204. project: Optional project name
  205. api_key: Optional API key
  206. Returns:
  207. Number of imported memories
  208. """
  209. if format_type == "json":
  210. return import_from_json(file_path, url, api_key)
  211. elif format_type == "text":
  212. return import_from_text(file_path, url, project, api_key)
  213. elif format_type == "markdown":
  214. return import_from_markdown(file_path, url, project, api_key)
  215. else:
  216. logger.error(f"Unsupported format: {format_type}")
  217. return -1
  218. def main() -> int:
  219. """Main entry point."""
  220. parser = argparse.ArgumentParser(
  221. description="Bulk import memories into Cognio",
  222. formatter_class=argparse.RawDescriptionHelpFormatter,
  223. epilog="""
  224. Examples:
  225. # Import from JSON export
  226. %(prog)s backup.json
  227. # Import from text file with project
  228. %(prog)s notes.txt --project LEARNING
  229. # Import from Markdown export
  230. %(prog)s memories.md --format markdown
  231. # Import with API key
  232. %(prog)s data.json --api-key your-secret-key
  233. # Import to custom server
  234. %(prog)s backup.json --url http://remote-server:8080
  235. """,
  236. )
  237. parser.add_argument("file", type=Path, help="File to import (JSON, TXT, or MD)")
  238. parser.add_argument(
  239. "--format",
  240. choices=["json", "text", "markdown", "auto"],
  241. default="auto",
  242. help="Input format (default: auto-detect from extension)",
  243. )
  244. parser.add_argument("--project", help="Project name for imported memories")
  245. parser.add_argument("--url", default="http://localhost:8080", help="Cognio server URL")
  246. parser.add_argument("--api-key", help="API key for authentication")
  247. parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
  248. args = parser.parse_args()
  249. if args.verbose:
  250. logging.getLogger().setLevel(logging.DEBUG)
  251. if not args.file.exists():
  252. logger.error(f"File not found: {args.file}")
  253. return 1
  254. format_type = detect_format(args.file, args.format)
  255. if not format_type:
  256. logger.error(
  257. f"Cannot auto-detect format for {args.file.suffix} files. Use --format option."
  258. )
  259. return 1
  260. logger.info(f"Importing from {args.file} (format: {format_type})")
  261. if not check_server_health(args.url):
  262. return 1
  263. try:
  264. imported = perform_import(format_type, args.file, args.url, args.project, args.api_key)
  265. if imported < 0:
  266. return 1
  267. if imported > 0:
  268. logger.info(f"Successfully imported {imported} memories")
  269. else:
  270. logger.warning("No new memories imported")
  271. return 0
  272. except Exception as e:
  273. logger.error(f"Import failed: {e}")
  274. return 1
  275. if __name__ == "__main__":
  276. sys.exit(main())