Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add validation script for nvd json files #3723

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions validate_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import gzip
import json
import os
import zipfile
import hashlib
import logging
from jsonschema import validate
from jsonschema.exceptions import ValidationError
import requests


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()


def calculate_sha256(file_path):
sha256_hash = hashlib.sha256()
with zipfile.ZipFile(file_path, "r") as zip_file:
json_filename = os.path.splitext(os.path.basename(file_path))[0]
with zip_file.open(json_filename, "r") as json_file:
for byte_block in iter(lambda: json_file.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()


def parse_meta_content(meta_content):
meta_data = {}
for line in meta_content.splitlines():
key, value = line.split(":", 1)
meta_data[key.strip()] = value.strip()
return meta_data


def validate_meta_info(directory):
logger.info("Scanning Files ......\n")
validation_failures = []

for filename in os.listdir(directory):
if filename.endswith(".meta"):
meta_path = os.path.join(directory, filename)
base_filename, _ = os.path.splitext(filename)

zip_path = os.path.join(directory, f"{base_filename}.json.zip")
gz_path = os.path.join(directory, f"{base_filename}.json.gz")

if not (os.path.exists(zip_path) and os.path.exists(gz_path)):
logger.warning(
f"Skipping {base_filename}, missing .json.zip or .json.gz"
)
continue

try:
with open(meta_path, "r") as meta_file:
meta_content = meta_file.read()
meta_data = parse_meta_content(meta_content)

# Validate size
with zipfile.ZipFile(zip_path, "r") as zip_file:
json_filename = f"{base_filename}.json"

# Check if the file exists in the zip archive
if json_filename not in zip_file.namelist():
raise ValueError(
f"File {json_filename} not found in the zip archive for {base_filename}"
)
with zip_file.open(json_filename, "r") as json_file:
uncompressed_size = len(json_file.read())
if (
meta_data.get("size") is None
or int(meta_data["size"]) != uncompressed_size
):
raise ValueError(f"Size mismatch for {base_filename}")

# Validate zipSize
compressed_size = zip_file.getinfo(json_filename).compress_size
compressed_size = compressed_size + 154
if (
meta_data.get("zipSize") is None
or int(meta_data["zipSize"]) != compressed_size
):
raise ValueError(f"zipSize mismatch for {base_filename}")

# Validate gzSize
with open(gz_path, "rb") as gz_file:
gz_data = gz_file.read()
if meta_data.get("gzSize") is None or int(
meta_data["gzSize"]
) != len(gz_data):
raise ValueError(f"gzSize mismatch for {base_filename}")

# Json_schema validation
year = int(base_filename[base_filename.rfind("-") + 1 :])
# json_validation(gz_path ,year) // function needs to be fixed

# Validate sha256
sha256_calculated = calculate_sha256(zip_path)
if (
meta_data.get("sha256") is None
or sha256_calculated.upper() != meta_data["sha256"]
):
raise ValueError(f"sha256 mismatch for {base_filename}")

except ValueError as e:
validation_failures.append(str(e))

if validation_failures:
logger.error("Validation failed for the following files:")
for failure in validation_failures:
logger.error(failure)
raise ValueError("Validation failed for one or more files.")
else:
logger.info("Validation successful for all files.")


def json_validation(gz_path, year):
NVD_SCHEMA = (
"https://scap.nist.gov/schema/nvd/feed/1.1/nvd_cve_feed_json_1.1.schema"
)
SCHEMA = requests.get(NVD_SCHEMA, timeout=300).json()
logger.info("Schema loaded successfully")
logger.info("schema: ", SCHEMA)

with gzip.open(gz_path, "rb") as gz_file:
try:
nvd_json = json.loads(gz_file.read())
logger.info(f"Loaded json for year {year}: nvdcve-1.1-{year}.json.gz")
validate(nvd_json, SCHEMA)
logger.info(f"Validation complete for file of year {year}")
except ValidationError as ve:
logger.info(ve)


if __name__ == "__main__":
directory_path = "path_to_directory"
try:
validate_meta_info(directory_path)
except ValueError as e:
logger.error(f"Error: {e}")
Loading