Skip to content

Commit

Permalink
Adding command line script
Browse files Browse the repository at this point in the history
  • Loading branch information
gershnik committed Jun 19, 2024
1 parent 44efdec commit a41c5f7
Show file tree
Hide file tree
Showing 11 changed files with 509 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

### Added
- Command line script to build repositories. Invoke with `repopulator` (or `python3 -m repopulator`)

### Changed
- AptRepo.add_distribution informational fields arguments are now optional rather than required.

## [1.0] - 2024-06-08

### Added
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,6 @@ Homepage = 'https://github.com/gershnik/repopulator'
Documentation = 'https://gershnik.github.io/repopulator'
Issues = 'https://github.com/gershnik/repopulator/issues'
Changelog = 'https://github.com/gershnik/repopulator/blob/master/CHANGELOG.md'

[project.scripts]
repopulator = 'repopulator.__main__:main'
340 changes: 340 additions & 0 deletions src/repopulator/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024, Eugene Gershnik
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE.txt file or at
# https://opensource.org/licenses/BSD-3-Clause

# pylint: disable=missing-function-docstring

"""Command line utility"""

from __future__ import annotations

import sys
import argparse

from abc import ABCMeta, abstractmethod
from pathlib import Path
from typing import Any, Sequence, Tuple

from repopulator.alpine import AlpineRepo
from repopulator.apt import AptPackage, AptRepo
from repopulator.freebsd import FreeBSDRepo
from repopulator.pacman import PacmanRepo
from repopulator.rpm import RpmRepo
from repopulator.pgp_signer import PgpSigner
from repopulator.pki_signer import PkiSigner


class _Handler(metaclass=ABCMeta):
@abstractmethod
def add_parser(self, key: str, subparsers: argparse._SubParsersAction[argparse.ArgumentParser]):
...

@abstractmethod
def handle(self, args: argparse.Namespace) -> int:
...

class _AlpineHandler(_Handler):
def add_parser(self, key: str, subparsers: argparse._SubParsersAction[argparse.ArgumentParser]):
parser: argparse.ArgumentParser = subparsers.add_parser(key, description='Create Alpine apk repo')

parser.add_argument('-o', '--output', dest='dest', type=Path, metavar='DEST',
help='output path to export the repository to')

parser.add_argument('-d', '--desc', type=str, dest='desc', required=True,
help='repository description')
parser.add_argument('-k', '--key', type=Path, dest='key_path', metavar='PATH', required=True,
help='path of the private key for signing. If -s/--signer option is not supplied '
'the stem of the private key filename is used as the name. '
'So for example a key [email protected] will result in [email protected] '
'being used as a signer name.')
parser.add_argument('-w', '--password', type=str, dest='key_password', metavar='PASSWORD',
help='private key password')
parser.add_argument('-s', '--signer', type=str, dest='signer',
help='name of the signer. This can be used to override name deduced from the key filename')

parser.add_argument('-p', '--packages', nargs='+', metavar='PACKAGE',
help='.apk file(s) to add to repository. To override apk architecture use filename:arch format. '
'For example foo-doc-1.1-r0.apk:x86_64')


def handle(self, args: argparse.Namespace):
desc: str = args.desc
packages: Sequence[str] = args.packages
key_path: Path = args.key_path
key_password: str | None = args.key_password
signer_name: str | None = args.signer
dest: Path = args.dest

if signer_name is None:
last_dot_idx = key_path.name.rfind('.')
if last_dot_idx == 0:
print('unable to determine signer name from the key, please use --signer option', file=sys.stderr)
return 1
signer_name = key_path.name[0:last_dot_idx]

print(f'Signing as {signer_name}')

repo = AlpineRepo(desc)
for p in packages:
parts = p.split(':', 2)
if len(parts) == 2:
print(f'Adding {parts[0]} with architecture {parts[1]}')
repo.add_package(parts[0], force_arch=parts[1])
else:
print(f'Adding {parts[0]}')
repo.add_package(parts[0])
signer = PkiSigner(key_path, key_password)

repo.export(dest, signer, signer_name)

return 0


class _FreeBSDHandler(_Handler):
def add_parser(self, key: str, subparsers: argparse._SubParsersAction[argparse.ArgumentParser]):
parser: argparse.ArgumentParser = subparsers.add_parser(key, description='Create FreeBSD pkg repo')

parser.add_argument('-o', '--output', dest='dest', type=Path, metavar='DEST',
help='output path to export the repository to')
parser.add_argument('-k', '--key', type=Path, dest='key_path', metavar='PATH', required=True,
help='path of the private key for signing.')
parser.add_argument('-w', '--password', type=str, dest='key_password', metavar='PASSWORD',
help='private key password')
parser.add_argument('-p', '--packages', nargs='+', metavar='PACKAGE',
help='.pkg file(s) to add to repository.')

def handle(self, args: argparse.Namespace):
packages: Sequence[str] = args.packages
key_path: Path = args.key_path
key_password: str | None = args.key_password
dest: Path = args.dest

repo = FreeBSDRepo()

for p in packages:
print(f'Adding {p}')
repo.add_package(p)

signer = PkiSigner(key_path, key_password)

repo.export(dest, signer)

return 0

class _RpmHandler(_Handler):
def add_parser(self, key: str, subparsers: argparse._SubParsersAction[argparse.ArgumentParser]):
parser: argparse.ArgumentParser = subparsers.add_parser(key, description='Create RPM repo')

parser.add_argument('-o', '--output', dest='dest', type=Path, metavar='DEST',
help='output path to export the repository to')
parser.add_argument('-k', '--key', type=Path, dest='key_name', metavar='NAME', required=True,
help='Name or ID of the GPG key for signing')
parser.add_argument('-w', '--password', type=str, dest='key_password', metavar='PASSWORD', required=True,
help='GPG key password')
parser.add_argument('-p', '--packages', nargs='+', metavar='PACKAGE',
help='.rpm file(s) to add to repository.')


def handle(self, args: argparse.Namespace):
packages: Sequence[str] = args.packages
key_name: str = args.key_name
key_password: str = args.key_password
dest: Path = args.dest

repo = RpmRepo()

for package in packages:
print(f'Adding {package}')
repo.add_package(package)

signer = PgpSigner(key_name=key_name, key_pwd=key_password)

repo.export(dest, signer)

return 0

class _PacmanHandler(_Handler):
def add_parser(self, key: str, subparsers: argparse._SubParsersAction[argparse.ArgumentParser]):
parser: argparse.ArgumentParser = subparsers.add_parser(key, description='Create Pacman repo')

parser.add_argument('-o', '--output', dest='dest', type=Path, metavar='DEST',
help='output path to export the repository to')

parser.add_argument('-n', '--name', type=str, dest='name', required=True,
help='repository name')
parser.add_argument('-k', '--key', type=Path, dest='key_name', metavar='NAME', required=True,
help='Name or ID of the GPG key for signing')
parser.add_argument('-w', '--password', type=str, dest='key_password', metavar='PASSWORD', required=True,
help='GPG key password')
parser.add_argument('-p', '--packages', nargs='+', metavar='PACKAGE',
help='.zst file to add to repository. If a .sig file with the same name exists next to it, '
' it will be automatically used to supply the package signature')


def handle(self, args: argparse.Namespace):
name: str = args.name
packages: Sequence[str] = args.packages
key_name: str = args.key_name
key_password: str = args.key_password
dest: Path = args.dest

repo = PacmanRepo(name)

for p in packages:
print(f'Adding {p}')
repo.add_package(p)

signer = PgpSigner(key_name=key_name, key_pwd=key_password)

repo.export(dest, signer)

return 0

class _AptDistroAction(argparse.Action):
def __call__(self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: str | Sequence[Any] | None,
option_string: str | None = None) -> None:
if not isinstance(values, str):
raise argparse.ArgumentError(self, 'distribution option must have a single value')
if not hasattr(namespace, 'distros'):
namespace.distros = {}
distro = namespace.distros.get(values)
if distro is None:
distro = argparse.Namespace()
distro.origin = None
distro.label = None
distro.suite = None
distro.codename = None
distro.version = None
distro.desc = None
distro.packages = []
namespace.distros[values] = distro
namespace.current_distro = distro


class _AptStoreAction(argparse._StoreAction):
def __call__(self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: str | Sequence[Any] | None,
option_string: str | None = None) -> None:
if not hasattr(namespace, 'distros'):
name = argparse._get_action_name(self)
raise argparse.ArgumentError(self, f'you must use --distro before {name}')
super().__call__(parser, namespace.current_distro, values, option_string)

class _AptHandler(_Handler):
def add_parser(self, key: str, subparsers: argparse._SubParsersAction[argparse.ArgumentParser]):
parser: argparse.ArgumentParser = subparsers.add_parser(key, description='Create APT repo')

parser.add_argument('-o', '--output', dest='dest', type=Path, metavar='DEST',
help='output path to export the repository to')
parser.add_argument('-k', '--key', type=Path, dest='key_name', metavar='NAME', required=True,
help='Name or ID of the GPG key for signing')
parser.add_argument('-w', '--password', type=str, dest='key_password', metavar='PASSWORD', required=True,
help='GPG key password')

parser.add_argument('-d', '--distro', type=str, dest='distro', metavar='DISTRO', required=True, action=_AptDistroAction,
help='Distribution name. This can be a relative path like `stable/updates`. All subsequent '
'per-distribution options apply to this distribution '
'Conversely this option is required to precede all per-distribution options. Multiple '
'distributions may be specified on the same command line')

parser.add_argument('-g', '--origin', type=str, dest='origin', metavar='STRING', required=False, action=_AptStoreAction,
help='current distribution origin')
parser.add_argument('-l', '--label', type=str, dest='label', metavar='STRING', required=False, action=_AptStoreAction,
help='current distribution label')
parser.add_argument('-s', '--suite', type=str, dest='suite', metavar='STRING', required=False, action=_AptStoreAction,
help='current distribution suite')
parser.add_argument('-c', '--codename', type=str, dest='codename', metavar='STRING', required=False, action=_AptStoreAction,
help='current distribution codename')
parser.add_argument('--dist-version', type=str, dest='version', metavar='STRING', required=False, action=_AptStoreAction,
help='current distribution version')
parser.add_argument('--desc', type=str, dest='desc', metavar='STRING', required=False, action=_AptStoreAction,
help='current distribution description')
parser.add_argument('-p', '--packages', nargs='+', metavar='PACKAGE', action=_AptStoreAction,
help='.deb file(s) to add to the current distribution. To specify a component for each package '
'use `filename:component` format. For example `foo-1.2.3_amd64.deb:contrib` will assign '
'foo-1.2.3_amd64.deb to contrib component. '
'If no component is specified `main` is assumed.')

def handle(self, args: argparse.Namespace):
distros: dict[str, argparse.Namespace] = args.distros
key_name: str = args.key_name
key_password: str = args.key_password
dest: Path = args.dest

repo = AptRepo()

all_packages: dict[str, AptPackage] = {}
for distro_path, distro_args in distros.items():
# normalize the list of packages to set((name, component))
normalized: set[Tuple[str, str]] = set()
if distro_args.packages is not None:
for package in distro_args.packages:
name, _, component = package.partition(':')
if len(component) == 0:
component = 'main'
normalized.add((name, component))

print(f'Adding distribution: {distro_path}')
distro = repo.add_distribution(distro_path,
origin=distro_args.origin,
label=distro_args.label,
suite=distro_args.suite,
codename=distro_args.codename,
version=distro_args.version,
description=distro_args.desc)

for name, component in normalized:
repo_object = all_packages.get(name)
if repo_object is None:
print(f'Adding new package: {name}')
repo_object = repo.add_package(name)
all_packages[name] = repo_object
print(f'Assigning package: {name} to component {component}')
repo.assign_package(repo_object, distro, component)


signer = PgpSigner(key_name=key_name, key_pwd=key_password)

repo.export(dest, signer)

return 0


def main():
"""script entry point"""

repo_types: dict[str, _Handler] = {
'alpine': _AlpineHandler(),
'apt': _AptHandler(),
'freebsd': _FreeBSDHandler(),
'pacman': _PacmanHandler(),
'rpm': _RpmHandler(),
}


parser = argparse.ArgumentParser(
prog='repopulator',
description='Populates software repositories',
epilog="Use repopulator TYPE -h to get more help for each type's options"
)
subparsers = parser.add_subparsers(
help='type of repository to create, one of: ' + ', '.join(repo_types),
metavar='TYPE',
dest='repo_key',
required=True
)
for repo_key, handler in repo_types.items():
handler.add_parser(repo_key, subparsers)

args = parser.parse_args()
return repo_types[args.repo_key].handle(args)

if __name__ == '__main__':
sys.exit(main())
18 changes: 18 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024, Eugene Gershnik
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE.txt file or at
# https://opensource.org/licenses/BSD-3-Clause

# pylint: skip-file

import os
Expand Down Expand Up @@ -90,10 +96,22 @@ def pgp_signer():
key_pwd = os.environ['PGP_KEY_PASSWD'],
homedir=os.environ.get('GNUPGHOME'))

@pytest.fixture
def pgp_cmd():
return ['-k', os.environ['PGP_KEY_NAME'], '-w', os.environ['PGP_KEY_PASSWD']]


@pytest.fixture
def pki_signer():
return PkiSigner((Path(os.environ['BSD_KEY'])), os.environ.get('BSD_KEY_PASSWD'))

@pytest.fixture
def pki_cmd():
ret = ['-k', os.environ['BSD_KEY']]
if (pwd := os.environ.get('BSD_KEY_PASSWD')) is not None:
ret += ['-w', pwd]
return ret


@pytest.fixture(scope='session')
def fixed_datetime():
Expand Down
Loading

0 comments on commit a41c5f7

Please sign in to comment.