Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added debian parser #3543

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cve_bin_tool/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"php",
"perl",
"dart",
"debian",
]


Expand Down
138 changes: 138 additions & 0 deletions cve_bin_tool/parsers/deb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Copyright (C) 2022 Intel Corporation
# SPDX-License-Identifier: GPL-3.0-or-later

import asyncio
import os
import re
import tempfile
from pathlib import Path

from cve_bin_tool.async_utils import aio_glob, aio_inpath, aio_run_command
from cve_bin_tool.extractor import BaseExtractor
from cve_bin_tool.parsers import Parser


class DebParser(Parser):
def __init__(self, cve_db, logger) -> None:
super().__init__(cve_db, logger)

def parse_control_file(self, control_file):
"""Parse the Debian control file and return a dictionary of its contents."""
control_data = {}
try:
for line in control_file:
line = line.decode("utf-8")
if ":" in line:
key, value = line.split(":", 1)
control_data[key.strip()] = value.strip()
except Exception as e:
self.logger.debug(f"An error occurred while parsing the control file: {e}")
return control_data

async def unpack_tar_xz(self, archive_path, extraction_path):
"""Unpack a tar.xz file asynchronously."""
loop = asyncio.get_event_loop()
extractor = BaseExtractor()
await loop.run_in_executor(
None, extractor.extract_file_tar, archive_path, extraction_path
)

async def extract_control(self, filename):
"""Extract and parse the control file from a debian package."""
is_ar = True
control_data = {}
process_can_fail = False
if await aio_inpath("file"):
stdout, stderr, return_code = await aio_run_command(
["file", filename], process_can_fail
)
if not re.search(b"Debian binary package", stdout):
is_ar = False

if is_ar:
if not await aio_inpath("ar"):
self.logger.debug("ar tool not found")
return control_data
else:
with tempfile.TemporaryDirectory() as temp_dir:
# Extract the .deb package
original_dir = os.getcwd()

# Change the working directory to the temp_dir for extraction
os.chdir(temp_dir)
await aio_run_command(["ar", "x", filename])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We should check to make sure ar exists before we run it.
  2. Is there any chance that ar could escape the temp directory? I know we're having this problem with tar but I don't know if ar has the same problem.
  3. If we're only using the control data to identify the .deb file, let's change the code to only extract that part and absolutely no other files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ar, as used in the aio_run_command() function, helps to extract the contents of the given debian package into a temporary directory, inside which we keep extracting till we reach the control file, then the contents of the file are written onto control_data variable and then the temporary directory is closed, deleting all the extracted contents.
I think my code already does what you want it to.
As for why im using -x to extract all the files inside the package - It is because the control files are sometimes present in different directories than one might expect it to . It might be present directly inside the debian package or maybe inside another tar file in the package. That is why all the files are being extracted here.


# Change the working directory to original after extraction
os.chdir(original_dir)

# Use aio_glob to find control.tar.xz
control_tar_files = await aio_glob(
str(Path(temp_dir) / "control.tar.*")
)
self.logger.debug("Files extracted", control_tar_files)
if control_tar_files:
control_tar_path = control_tar_files[0]
await self.unpack_tar_xz(control_tar_path, temp_dir)

# Parse the control file
control_file_path = Path(temp_dir, "./control")
self.logger.debug(control_file_path)
if control_file_path.exists():
with open(control_file_path, "rb") as control_file:
control_data = self.parse_control_file(control_file)
else:
self.logger.debug("Control archive not found.")
else:
self.logger.debug(f"{filename} is not a Debian binary package")

return control_data

def run_checker(self, filename):
try:
# Create a new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Run the async function and wait for the result
control_data = loop.run_until_complete(self.extract_control(filename))

# Clean up and close the loop
loop.close()

package = control_data["Package"]
version = control_data["Version"]
architechture = control_data["Architechture"]
essential = control_data["Essential"]
# priority= control_data['Priority']
# depends= control_data['Depends']
# maintainer= control_data['Maintainer']
# description = control_data['Description']

if package:
self.logger.debug(f"Package name is {package}")
else:
self.logger.debug("Package not found")

if version:
self.logger.debug(f"Version:{version}")
else:
self.logger.debug("No Version Found")

if architechture:
self.logger.debug(f"architechture name is {architechture}")
else:
self.logger.debug("architechture not found")

if essential:
self.logger.debug(f"essential name is {essential}")
else:
self.logger.debug("essential not found")

if package and version:
vendor = self.find_vendor(package, version)
if vendor is not None:
yield from vendor
except Exception as e:
self.logger.debug(f"Some Error occurred while parsing the file {e}")

self.logger.debug(f"Done parsing file {filename}")
2 changes: 2 additions & 0 deletions cve_bin_tool/parsers/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: GPL-3.0-or-later

from cve_bin_tool.parsers.dart import DartParser
from cve_bin_tool.parsers.deb import DebParser
from cve_bin_tool.parsers.go import GoParser
from cve_bin_tool.parsers.java import JavaParser
from cve_bin_tool.parsers.javascript import JavascriptParser
Expand All @@ -26,6 +27,7 @@
"Package.resolved": SwiftParser,
"composer.lock": PhpParser,
"cpanfile": PerlParser,
"test.deb": DebParser,
"pubspec.lock": DartParser,
}

Expand Down
Binary file added test/language_data/test.deb
Binary file not shown.
20 changes: 19 additions & 1 deletion test/test_language_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class TestLanguageScanner:

SWIFT_PRODUCTS = ["alliance_web_platform"]

DEBIAN_PRODUCTS = []

DART_PRODUCTS = ["dio", "archive"]

@classmethod
Expand Down Expand Up @@ -225,6 +227,7 @@ def test_language_package_none_found(self, filename: str) -> None:
(str(TEST_FILE_PATH / "Package.resolved"), SWIFT_PRODUCTS),
(str(TEST_FILE_PATH / "composer.lock"), PHP_PRODUCTS),
(str(TEST_FILE_PATH / "cpanfile"), PERL_PRODUCTS),
(str(TEST_FILE_PATH / "test.deb"), DEBIAN_PRODUCTS),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have test_language_scanner create the .deb file rather than checking it in to the test directory?

We've typically just put small test files in the directory and let it be, but it's hurting our OpenSSF score when we provide things like .deb packages that are basically installable (weirdly, it doesn't flag on the thousand .tar.gz files... yet).

I think .deb files use mostly tools we already have installed so it should be possible to write python or a makefile to generate the file here and add some code to skip the test if the file can't be built. Sorry that you get stuck as a guinea pig here!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm, I was working on this and was able to write something like this..

import os
import subprocess

def create_debian_package(directory, package_name, version, architecture, description, maintainer):
    # Create the necessary directory structure
    debian_dir = os.path.join(directory, 'DEBIAN')
    os.makedirs(debian_dir, exist_ok=True)

    # Create the control file
    control_content = f"""Package: {package_name}
Version: {version}
Architecture: {architecture}
Maintainer: {maintainer}
Description: {description}
"""
    with open(os.path.join(debian_dir, 'control'), 'w') as control_file:
        control_file.write(control_content)

    # Build the package
    subprocess.run(['dpkg-deb', '--build', directory, f'{package_name}_{version}_{architecture}.deb'])

if __name__ == '__main__':
    create_debian_package(
        directory='mypackage',
        package_name='mypackage',
        version='1.0',
        architecture='all',
        description='Example package',
        maintainer='Joydeep <[email protected]>'
    )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I add this file in the current PR or as a different PR?
This one has been going on for way too long 🥲🥲

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test.deb can of course be created for testing. But, would'nt that be like extra memory space?
Even if I create the debian package as a temporary file everytime, it would still be extra work

(str(TEST_FILE_PATH / "pubspec.lock"), DART_PRODUCTS),
],
)
Expand All @@ -233,6 +236,7 @@ def test_language_package(self, filename: str, products: set[str]) -> None:
scanner = VersionScanner()
scanner.file_stack.append(filename)
found_product = []
file_path = None
for product in scanner.scan_file(filename):
if product:
product_info, file_path = product
Expand All @@ -242,7 +246,8 @@ def test_language_package(self, filename: str, products: set[str]) -> None:
# expanded out to make missing products easier to spot
for p in products:
assert p in found_product
assert file_path == filename
if file_path:
assert file_path == filename

@pytest.mark.parametrize("filename", ((str(TEST_FILE_PATH / "PKG-INFO")),))
def test_python_package(self, filename: str) -> None:
Expand All @@ -256,3 +261,16 @@ def test_python_package(self, filename: str) -> None:
"facebook", "zstandard", "0.18.0", "/usr/local/bin/product"
)
assert file_path == filename

@pytest.mark.parametrize("filename", ((str(TEST_FILE_PATH / "test.deb")),))
def test_debian_control(self, filename: str) -> None:
scanner = VersionScanner()
scanner.file_stack.append(filename)
found_product = []
# Not expecting any packages to be found
for product in scanner.scan_file(filename):
if product:
product_info, file_path = product
if product_info.product not in found_product:
found_product.append(product_info.product)
assert found_product is not None
Loading