check-jsonschema.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import argparse
  2. import json
  3. import logging
  4. import os
  5. import sys
  6. from typing import Any
  7. from json_source_map import calculate
  8. from json_source_map.errors import InvalidInputError
  9. from jsonschema import Draft7Validator
  10. def discover_schema_file(filename: str) -> tuple[str | None, Any]:
  11. logger = logging.getLogger()
  12. with open(filename) as json_file:
  13. json_data = json.load(json_file)
  14. schema_filename = json_data.get("$schema", None)
  15. if not schema_filename:
  16. logger.info(f"ℹ️ ${filename} has no schema definition")
  17. return (None, None)
  18. schema_file = os.path.join(os.path.dirname(filename), schema_filename)
  19. with open(schema_file) as schema_file:
  20. schema_data = json.load(schema_file)
  21. return (str(schema_file), schema_data)
  22. def validate_json_files(
  23. schema_data: dict[Any, Any], json_file_name: str
  24. ) -> list[dict[str, str]]:
  25. logger = logging.getLogger()
  26. with open(json_file_name) as json_file:
  27. text_data = json_file.read()
  28. json_data = json.loads(text_data)
  29. source_map = calculate(text_data)
  30. validator = Draft7Validator(schema_data)
  31. violations = []
  32. for violation in sorted(validator.iter_errors(json_data), key=str):
  33. logger.info(
  34. f"⚠️ Schema violation in file '{json_file_name}':\n{violation}\n----\n"
  35. )
  36. if len(violation.absolute_path):
  37. error_path = "/".join(
  38. str(path_element) for path_element in violation.absolute_path
  39. )
  40. error_entry = source_map["/{}".format(error_path)]
  41. violation_data = {
  42. "file": json_file_name,
  43. "title": "Validation Error",
  44. "message": violation.message,
  45. "annotation_level": "failure",
  46. "start_line": error_entry.value_start.line + 1,
  47. "end_line": error_entry.value_end.line + 1,
  48. }
  49. violations.append(violation_data)
  50. return violations
  51. def main() -> int:
  52. parser = argparse.ArgumentParser(
  53. description="Validate JSON files by schema definition"
  54. )
  55. parser.add_argument(
  56. "json_files", metavar="FILE", type=str, nargs="+", help="JSON file to validate"
  57. )
  58. parser.add_argument(
  59. "--loglevel", type=str, help="Set log level", default="WARNING", required=False
  60. )
  61. arguments = parser.parse_args()
  62. logging.basicConfig(level=arguments.loglevel, format="%(levelname)s - %(message)s")
  63. logger = logging.getLogger()
  64. schema_mappings = {}
  65. for json_file in arguments.json_files:
  66. try:
  67. (schema_file, schema_data) = discover_schema_file(json_file)
  68. except OSError as e:
  69. logger.error(f"❌ Failed to discover schema for file '{json_file}': {e}")
  70. return 2
  71. if schema_file and schema_file not in schema_mappings.keys():
  72. schema_mappings.update(
  73. {schema_file: {"schema_data": schema_data, "files": set()}}
  74. )
  75. schema_mappings[schema_file]["files"].add(json_file)
  76. validation_errors = []
  77. for schema_entry in schema_mappings.values():
  78. for json_file in schema_entry["files"]:
  79. try:
  80. new_errors = validate_json_files(schema_entry["schema_data"], json_file)
  81. except (InvalidInputError, OSError) as e:
  82. logger.error(
  83. f"❌ Failed to create JSON source map for file '{json_file}': {e}"
  84. )
  85. return 2
  86. [validation_errors.append(error) for error in new_errors]
  87. if validation_errors:
  88. try:
  89. with open("validation_errors.json", "w") as results_file:
  90. json.dump(validation_errors, results_file)
  91. except OSError as e:
  92. logger.error(f"❌ Failed to write validation results file: {e}")
  93. return 2
  94. return 1
  95. return 0
  96. if __name__ == "__main__":
  97. sys.exit(main())