update_rules.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import argparse
  2. import configparser
  3. import glob
  4. import logging
  5. import os
  6. import shutil
  7. import stat
  8. from git import InvalidGitRepositoryError, Repo
  9. def del_rw(action, name: str, exc):
  10. os.chmod(name, stat.S_IWRITE)
  11. os.remove(name)
  12. def open_repo(path: str):
  13. if not os.path.exists(path):
  14. return None
  15. try:
  16. return Repo(path)
  17. except InvalidGitRepositoryError:
  18. return None
  19. def update_rules(repo_path: str, save_path: str, matches: list[str], keep_tree: bool):
  20. os.makedirs(save_path, exist_ok=True)
  21. for pattern in matches:
  22. files = glob.glob(os.path.join(repo_path, pattern), recursive=True)
  23. if len(files) == 0:
  24. logging.warn(f"no files found for pattern {pattern}")
  25. continue
  26. for file in files:
  27. if os.path.isdir(file):
  28. continue
  29. file_rel_path, file_name = os.path.split(os.path.relpath(file, repo_path))
  30. if keep_tree:
  31. file_dest_dir = os.path.join(save_path, file_rel_path)
  32. os.makedirs(file_dest_dir, exist_ok=True)
  33. file_dest_path = os.path.join(file_dest_dir, file_name)
  34. else:
  35. file_dest_path = os.path.join(save_path, file_name)
  36. shutil.copy2(file, file_dest_path)
  37. logging.info(f"copied {file} to {file_dest_path}")
  38. def main():
  39. parser = argparse.ArgumentParser()
  40. parser.add_argument("-c", "--config", default="rules_config.conf")
  41. args = parser.parse_args()
  42. config = configparser.ConfigParser()
  43. config.read(args.config)
  44. logging.basicConfig(format="%(asctime)s %(message)s", level=logging.DEBUG)
  45. for section in config.sections():
  46. repo = config.get(section, "name", fallback=section)
  47. url = config.get(section, "url")
  48. commit = config.get(section, "commit", fallback=None)
  49. branch = config.get(section, "branch", fallback=None)
  50. matches = config.get(section, "match").split("|")
  51. save_path = config.get(section, "dest", fallback=f"base/rules/{repo}")
  52. keep_tree = config.getboolean(section, "keep_tree", fallback=True)
  53. logging.info(f"reading files from url {url}, matches {matches}, save to {save_path} keep_tree {keep_tree}")
  54. repo_path = os.path.join("./tmp/repo/", repo)
  55. r = open_repo(repo_path)
  56. if r is None:
  57. logging.info(f"cloning repo {url} to {repo_path}")
  58. r = Repo.clone_from(url, repo_path)
  59. else:
  60. logging.info(f"repo {repo_path} exists")
  61. try:
  62. if commit is not None:
  63. logging.info(f"checking out to commit {commit}")
  64. r.git.checkout(commit)
  65. elif branch is not None:
  66. logging.info(f"checking out to branch {branch}")
  67. r.git.checkout(branch)
  68. else:
  69. logging.info(f"checking out to default branch")
  70. r.active_branch.checkout()
  71. except Exception as e:
  72. logging.error(f"checkout failed {e}")
  73. continue
  74. update_rules(repo_path, save_path, matches, keep_tree)
  75. shutil.rmtree("./tmp", ignore_errors=True)
  76. if __name__ == "__main__":
  77. main()