diff --git a/mutablesecurity/solutions/implementations/duplicati/code.py b/mutablesecurity/solutions/implementations/duplicati/code.py new file mode 100644 index 0000000..4a657a1 --- /dev/null +++ b/mutablesecurity/solutions/implementations/duplicati/code.py @@ -0,0 +1,693 @@ +"""Module defining a dummy security solution, for testing purposes.""" + +# pylint: disable=protected-access +# pylint: disable=missing-class-docstring +# pylint: disable=unused-argument +# pylint: disable=unexpected-keyword-arg + + +import os +import typing +import uuid +from logging.config import IDENTIFIER + +from pyinfra.api.deploy import deploy +from pyinfra.api.facts import FactBase +from pyinfra.operations import apt, files, server + +from mutablesecurity.helpers.data_type import StringDataType +from mutablesecurity.solutions.base import ( + BaseAction, + BaseInformation, + BaseLog, + BaseSolution, + BaseSolutionException, + BaseTest, + InformationProperties, + TestType, +) +from mutablesecurity.solutions.common.facts.bash import PresentCommand + + +class IncompatibleArchitectureException(BaseSolutionException): + """Your architecture does not support any teler build.""" + + +# Actions classes definitions + + +class LocalBackup(BaseAction): + @staticmethod + @deploy + def local_backup(source_file: str, backup_location: str) -> None: + command = DuplicatiHelper._make_local_backup( + source_file, backup_location + ) + server.shell( + commands=[command], + name="Backup files on localhost.", + ) + + IDENTIFIER = "local_backup" + DESCRIPTION = "Save files local" + ACT = local_backup + + +class RestoreLocalBackup(BaseAction): + @staticmethod + @deploy + def restore_local_backup(source_file: str, backup_location: str) -> None: + command = DuplicatiHelper._make_local_backup( + source_file, backup_location, reverse=True + ) + server.shell( + commands=[command], + name="Restore backup files on localhost.", + ) + + IDENTIFIER = "restore_local_backup" + DESCRIPTION = "Restore backup files on localhost" + ACT = restore_local_backup + + +class SshBackup(BaseAction): + @staticmethod + @deploy + def ssh_backup( + source_file: str, + server_ip: str, + username: str, + password: str, + passphrase: str, + ) -> None: + command = DuplicatiHelper._make_ssh_backup( + source_file, server_ip, username, password, passphrase + ) + server.shell( + commands=[command], + name="Save file over SSH", + ) + + IDENTIFIER = "ssh_backup" + DESCRIPTION = "Save files on remote computer via SSH" + ACT = ssh_backup + + +class RestoreSshBackup(BaseAction): + @staticmethod + @deploy + def restore_ssh_backup( + source_file: str, server_ip: str, username: str, password: str + ) -> None: + command = DuplicatiHelper._make_ssh_backup( + source_file, server_ip, username, password, reverse=True + ) + server.shell( + commands=[command], + name="Restore backup file over SSH", + ) + + IDENTIFIER = "restore_ssh_backup" + DESCRIPTION = "Restore files on localhost from remote computer over SSH" + ACT = restore_ssh_backup + + +class GoogleDriveBackup(BaseAction): + @staticmethod + @deploy + def google_drive_backup( + source_file: str, backup_location: str, oauth_token: str + ) -> None: + command = DuplicatiHelper._make_googledrive_backup( + source_file, backup_location, oauth_token + ) + server.shell( + commands=[command], + name="Save file to Google Drive", + ) + + IDENTIFIER = "google_drive_backup" + DESCRIPTION = "Save files to Google Drive" + ACT = google_drive_backup + + +class RestoreGoogleDriveBackup(BaseAction): + @staticmethod + @deploy + def restore_google_drive_backup( + source_file: str, backup_location: str, oauth_token: str + ) -> None: + command = DuplicatiHelper._make_googledrive_backup( + source_file, backup_location, oauth_token, reverse=True + ) + server.shell( + commands=[command], + name="Get backup file from Google Drive", + ) + + IDENTIFIER = "restore_google_drive_backup" + DESCRIPTION = "Get backup file from Google Drive" + ACT = restore_google_drive_backup + + +# info + +# Information classes definitions + + +class BinaryArchitectureFact(FactBase): + command = "dpkg --print-architecture" + + @staticmethod + def process(output: typing.List[str]) -> str: + architecture = output[0] + + if architecture in ["386", "amd64", "arm64", "armv6"]: + return architecture + else: + raise IncompatibleArchitectureException() + + +class BinaryArchitecture(BaseInformation): + IDENTIFIER = "architecture" + DESCRIPTION = "Binary's architecture" + INFO_TYPE = StringDataType + PROPERTIES = [ + InformationProperties.CONFIGURATION, + InformationProperties.READ_ONLY, + InformationProperties.AUTO_GENERATED_BEFORE_INSTALL, + ] + DEFAULT_VALUE = None + GETTER = BinaryArchitectureFact + SETTER = None + + +class EncryptionModule(BaseInformation): + @staticmethod + @deploy + def set_configuration( + old_value: typing.Any, new_value: typing.Any + ) -> None: + template_path = os.path.join( + os.path.dirname(__file__), "files/duplicati.conf.j2" + ) + j2_values = {"encryption_module": EncryptionModule.get()} + files.template( + src=template_path, + dest="/opt/mutablesecurity/duplicati/duplicati.conf", + configuration=j2_values, + name="Copy the generated configuration into duplicati's folder.", + ) + + @staticmethod + @deploy + class EncryptionModuleValue(FactBase): + command = "echo 'hi'" + + @staticmethod + def process(output: typing.List[str]) -> str: + return ( + "cat /opt/mutablesecurity/duplicati/duplicati.conf | grep" + f" {IDENTIFIER} | cut -d : -f 2" + ) + + IDENTIFIER = "encryption_module" + DESCRIPTION = "Algorithm used for encrytion" + INFO_TYPE = StringDataType + PROPERTIES = [ + InformationProperties.CONFIGURATION, + InformationProperties.OPTIONAL, + InformationProperties.WITH_DEFAULT_VALUE, + InformationProperties.NON_DEDUCTIBLE, + InformationProperties.WRITABLE, + ] + DEFAULT_VALUE = "aes" + GETTER = EncryptionModuleValue + SETTER = set_configuration + + +class CompressionModule(BaseInformation): + @staticmethod + @deploy + def set_configuration( + old_value: typing.Any, new_value: typing.Any + ) -> None: + template_path = os.path.join( + os.path.dirname(__file__), "files/duplicati.conf.j2" + ) + j2_values = {"compression_module": CompressionModule.get()} + files.template( + src=template_path, + dest="/opt/mutablesecurity/duplicati/duplicati.conf", + configuration=j2_values, + name="Copy the generated configuration into duplicati's folder.", + ) + + @staticmethod + @deploy + class CompressionModuleValue(FactBase): + command = "echo 'hi'" + + @staticmethod + def process(output: typing.List[str]) -> str: + return ( + "cat /opt/mutablesecurity/duplicati/duplicati.conf | grep" + f" {IDENTIFIER} | cut -d : -f 2" + ) + + IDENTIFIER = "compression_module" + DESCRIPTION = "Algorithm used from compression" + INFO_TYPE = StringDataType + PROPERTIES = [ + InformationProperties.CONFIGURATION, + InformationProperties.OPTIONAL, + InformationProperties.WITH_DEFAULT_VALUE, + InformationProperties.NON_DEDUCTIBLE, + InformationProperties.WRITABLE, + ] + DEFAULT_VALUE = "zip" + GETTER = CompressionModuleValue + SETTER = set_configuration + + +class SkipFilesLarger(BaseInformation): + @staticmethod + @deploy + def set_configuration( + old_value: typing.Any, new_value: typing.Any + ) -> None: + template_path = os.path.join( + os.path.dirname(__file__), "files/duplicati.conf.j2" + ) + j2_values = {"skip_files_larger_than": SkipFilesLarger.get()} + files.template( + src=template_path, + dest="/opt/mutablesecurity/duplicati/duplicati.conf", + configuration=j2_values, + name="Copy the generated configuration into duplicati's folder.", + ) + + @staticmethod + @deploy + class SkipLargerFilesValue(FactBase): + command = "echo 'hi'" + + @staticmethod + def process(output: typing.List[str]) -> str: + return ( + "cat /opt/mutablesecurity/duplicati/duplicati.conf | grep" + f" {IDENTIFIER} | cut -d : -f 2" + ) + + IDENTIFIER = "skip_files_larger_than" + DESCRIPTION = ( + "Don't backup files which heve size larger than corresponding value" + ) + INFO_TYPE = StringDataType + PROPERTIES = [ + InformationProperties.CONFIGURATION, + InformationProperties.OPTIONAL, + InformationProperties.WITH_DEFAULT_VALUE, + InformationProperties.NON_DEDUCTIBLE, + InformationProperties.WRITABLE, + ] + DEFAULT_VALUE = "2GB" + GETTER = SkipLargerFilesValue + SETTER = set_configuration + + +class ExcludeFilesAttributes(BaseInformation): + @staticmethod + @deploy + def set_configuration( + old_value: typing.Any, new_value: typing.Any + ) -> None: + template_path = os.path.join( + os.path.dirname(__file__), "files/duplicati.conf.j2" + ) + j2_values = {"exclude_files_attributes": ExcludeFilesAttributes.get()} + files.template( + src=template_path, + dest="/opt/mutablesecurity/duplicati/duplicati.conf", + configuration=j2_values, + name="Copy the generated configuration into duplicati's folder.", + ) + + @staticmethod + @deploy + class ExcludeFilesValue(FactBase): + command = "echo 'hi'" + + @staticmethod + def process(output: typing.List[str]) -> str: + return ( + "cat /opt/mutablesecurity/duplicati/duplicati.conf | grep" + f" {IDENTIFIER} | cut -d : -f 2" + ) + + IDENTIFIER = "exclude_files_attributes" + DESCRIPTION = "Don't backup files which have this attribute" + INFO_TYPE = StringDataType + PROPERTIES = [ + InformationProperties.CONFIGURATION, + InformationProperties.OPTIONAL, + InformationProperties.WITH_DEFAULT_VALUE, + InformationProperties.NON_DEDUCTIBLE, + InformationProperties.WRITABLE, + ] + DEFAULT_VALUE = "Temporary" + GETTER = ExcludeFilesValue + SETTER = set_configuration + + +class Passphrase(BaseInformation): + @staticmethod + @deploy + def set_configuration( + old_value: typing.Any, new_value: typing.Any + ) -> None: + template_path = os.path.join( + os.path.dirname(__file__), "files/duplicati.conf.j2" + ) + j2_values = {"passphrase": Passphrase.get()} + files.template( + src=template_path, + dest="/opt/mutablesecurity/duplicati/duplicati.conf", + configuration=j2_values, + name="Copy the generated configuration into duplicati's folder.", + ) + + @staticmethod + @deploy + class PassphraseValue(FactBase): + command = "echo 'hi'" + + @staticmethod + def process(output: typing.List[str]) -> str: + return ( + "cat /opt/mutablesecurity/duplicati/duplicati.conf | grep" + f" {IDENTIFIER} | cut -d : -f 2" + ) + + IDENTIFIER = "passphrase" + DESCRIPTION = "This value represents the value by encryption key" + INFO_TYPE = StringDataType + PROPERTIES = [ + InformationProperties.CONFIGURATION, + InformationProperties.OPTIONAL, + InformationProperties.WITH_DEFAULT_VALUE, + InformationProperties.NON_DEDUCTIBLE, + InformationProperties.WRITABLE, + ] + DEFAULT_VALUE = "mutablesecutiy" + GETTER = PassphraseValue + SETTER = set_configuration + + +class RestorePath(BaseInformation): + IDENTIFIER = "restore_path" + DESCRIPTION = "Path of directory you want to restore your backup" + INFO_TYPE = StringDataType + PROPERTIES = [ + InformationProperties.CONFIGURATION, + InformationProperties.OPTIONAL, + InformationProperties.WITH_DEFAULT_VALUE, + InformationProperties.NON_DEDUCTIBLE, + InformationProperties.WRITABLE, + ] + DEFAULT_VALUE = "/opt/mutablesecurity/duplicati/restorePath" + GETTER = None + SETTER = None + + +# Logs classes definitions +class DefaultLogs(BaseLog): + class DefaultLogsFact(FactBase): + command = "cat /var/log/duplicati.log" + + @staticmethod + def process(output: typing.List[str]) -> str: + return "\n".join(output) + + IDENTIFIER = "logs" + DESCRIPTION = "Default log location" + FACT = DefaultLogsFact + + +# Tests +class SupportedArchitecture(BaseTest): + class SupportedArchitectureFact(BinaryArchitectureFact): + @staticmethod + def process( # type: ignore[override] + output: typing.List[str], + ) -> bool: + architecture = BinaryArchitectureFact.process(output) + + return architecture is not None + + IDENTIFIER = "supported_architecture" + DESCRIPTION = "Checks if there is any build for this architecture." + TEST_TYPE = TestType.REQUIREMENT + FACT = SupportedArchitectureFact + + +class ClientCommandPresence(BaseTest): + IDENTIFIER = "command" + DESCRIPTION = "Checks if the Duplici cli is registered as a command." + TEST_TYPE = TestType.PRESENCE + FACT = PresentCommand + FACT_ARGS = ("duplicati-cli --help",) + + +class ClientEncryption(BaseTest): + class EncryptionTest(FactBase): + @staticmethod + def command() -> str: + backup_path = "/tmp/backup" + uuid.uuid4().hex + file_test = "/tmp/file.txt" + execute_command = [ + "echo 'Hi' >> f{file_test}", + DuplicatiHelper._make_local_backup(file_test, backup_path) + + f" && dir {backup_path} |grep '*.f{EncryptionModule.get()}'", + f"rm -r {backup_path}", + ] + + return execute_command + + @staticmethod + def process(output: typing.List[str]) -> bool: + return int(output[0]) != 0 + + IDENTIFIER = "encryption_test" + DESCRIPTION = "Checks if the Duplici local encryption works" + TEST_TYPE = TestType.SECURITY + FACT = EncryptionTest + + +# Solution class definition + + +class Duplicati(BaseSolution): + INFORMATION = [ + BinaryArchitecture, + CompressionModule, + EncryptionModule, + ExcludeFilesAttributes, + SkipFilesLarger, + RestorePath, + Passphrase, + ] + TESTS = [ + ClientEncryption, + SupportedArchitecture, + ClientCommandPresence, + ] + LOGS = [ + DefaultLogs, # type: ignore[list-item] + ] + ACTIONS = [ + LocalBackup, # type: ignore[list-item] + RestoreLocalBackup, # type: ignore[list-item] + SshBackup, # type: ignore[list-item] + RestoreSshBackup, # type: ignore[list-item] + GoogleDriveBackup, # type: ignore[list-item] + RestoreGoogleDriveBackup, # type: ignore[list-item] + ] + + @staticmethod + @deploy + def _install() -> None: + architecture = BinaryArchitecture.get() + if not architecture: + raise IncompatibleArchitectureException() + + release_url = ( + "https://updates.duplicati.com/beta/duplicati_2.0.6.3-1_all.deb" + ) + apt.deb( + name="Install Duplicati via deb", + src=release_url, + ) + + template_path = os.path.join( + os.path.dirname(__file__), "files/duplicati.conf.j2" + ) + j2_values = { + "encryption_module": EncryptionModule.get(), + "compression_module": CompressionModule.get(), + "skip_files_larger_than": SkipFilesLarger.get(), + "exclude_files_attributes": ExcludeFilesAttributes.get(), + "passphrase": Passphrase.get(), + } + files.template( + src=template_path, + dest="/opt/mutablesecurity/duplicati/duplicati.conf", + configuration=j2_values, + name="Copy the generated configuration into Duplicati's folder.", + ) + + @staticmethod + @deploy + def _uninstall(remove_logs: bool = True) -> None: + apt.packages( + packages=["duplicati"], + present=False, + extra_uninstall_args="--purge", + name="Uninstalls Duplicati via apt.", + ) + files.directory( + name="Remove duplicati executable and configuration.", + path="/opt/mutablesecurity/duplicati", + present=False, + ) + files.directory( + name="Remove duplicati log file.", + path="/var/log/duplicati.log", + present=False, + force=True, + ) + + @staticmethod + @deploy + def _update() -> None: + apt.packages( + packages=["duplicati"], + latest=True, + name="Update duplicati via apt.", + ) + + +class DuplicatiHelper: + @staticmethod + def _load_default_param() -> str: + command_params = { + "encryptionModule": " --encryption-module=" + + EncryptionModule.get(), + "compressionModule": " --compression-module=" + + CompressionModule.get(), + "skipFilesLrger": " --skip-files-larger-than=" + + SkipFilesLarger.get(), + "excludeFilesAtt": " --exclude-files-attributes=" + + ExcludeFilesAttributes.get(), + "logFile": " --log-file=/var/log/duplicati.log", + } + + if EncryptionModule.get() == "none": + command_params["encryptionModule"] = " " + if CompressionModule.get() == "none": + command_params["compressionModule"] = " " + + return command_params + + @staticmethod + def _make_local_backup( + source_file: str, backup_location: str, reverse: bool = False + ) -> str: + params = DuplicatiHelper._load_default_param() + operation = 'backup "' + + if reverse: + operation = 'restore "' + restore_location = '" --restore-path="' + RestorePath.get() + backup_location = restore_location + + command = ( + "duplicati-cli " + + operation + + backup_location + + '" "' + + source_file + + '" ' + + " --passphrase=" + + Passphrase.get() + + " ".join(params.values()) + ) + return command + + @staticmethod + def _make_ssh_backup( + source_file: str, + server_ip: str, + username: str, + password: str, + reverse: bool = False, + ) -> str: + username = " --auth-username=" + username + password = " --auth-password=" + password + server_ip = "ssh://" + server_ip + '" ' + params = DuplicatiHelper._load_default_param() + operation = 'backup "' + + if reverse: + operation = 'restore "' + location = '" --restore-path="' + RestorePath.get() + server_ip = location + + command = ( + "duplicati-cli " + + operation + + server_ip + + '"' + + source_file + + '" ' + + username + + password + + " --passphrase=" + + Passphrase.get() + + " ".join(params.values()) + ) + + return command + + @staticmethod + def _make_googledrive_backup( + source_file: str, + backup_location: str, + oauth_token: str, + reverse: bool = False, + ) -> str: + backup_path = "googledrive://" + backup_location + authid = " --authid=" + oauth_token + params = DuplicatiHelper._load_default_param() + operation = 'backup "' + + if reverse: + operation = 'restore "' + restore_location = '" --restore-path="' + RestorePath.get() + backup_location = restore_location + + command = ( + "duplicati-cli " + + operation + + backup_path + + '" "' + + source_file + + '" ' + + authid + + params["encryptionModule"] + + " --passphrase=" + + Passphrase.get() + + " ".join(params.values()) + ) + + return command diff --git a/mutablesecurity/solutions/implementations/duplicati/files/duplicati.conf.j2 b/mutablesecurity/solutions/implementations/duplicati/files/duplicati.conf.j2 new file mode 100644 index 0000000..eec2763 --- /dev/null +++ b/mutablesecurity/solutions/implementations/duplicati/files/duplicati.conf.j2 @@ -0,0 +1,5 @@ +encryption_module : {{ configuration["encryption_module"] }} +compression_module : {{ configuration["compression_module"] }} +skip_files_larger_than : {{ configuration["skip_files_larger_than"] }} +exclude_files_attributes : {{ configuration["exclude_files_attributes"] }} +passphrase : {{ configuration["passphrase"] }} \ No newline at end of file diff --git a/mutablesecurity/solutions/implementations/duplicati/logo.png b/mutablesecurity/solutions/implementations/duplicati/logo.png new file mode 100644 index 0000000..9f5cba3 Binary files /dev/null and b/mutablesecurity/solutions/implementations/duplicati/logo.png differ diff --git a/mutablesecurity/solutions/implementations/duplicati/meta.yaml b/mutablesecurity/solutions/implementations/duplicati/meta.yaml new file mode 100644 index 0000000..c6aef2f --- /dev/null +++ b/mutablesecurity/solutions/implementations/duplicati/meta.yaml @@ -0,0 +1,8 @@ +full_name: Duplicati 2.0 +description: Duplicati is a backup client that securely stores encrypted, incremental, compressed backups on local storage, cloud storage services and remote file servers. +references: + - https://www.duplicati.com/ + - https://github.com/duplicati/duplicati +maturity: DEV_ONLY +categories: + - NONE