Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【功能请求】增加CDN和宝塔Nginx自动部署 #111

Open
fcwys opened this issue Jun 26, 2024 · 1 comment
Open

【功能请求】增加CDN和宝塔Nginx自动部署 #111

fcwys opened this issue Jun 26, 2024 · 1 comment

Comments

@fcwys
Copy link

fcwys commented Jun 26, 2024

1、支持阿里云、腾讯云、多吉云CDN自动部署(可参考这个),可选指定域名启用或禁用自动更新;
2、支持宝塔面板Nginx自动部署,可选指定域名启用或禁用自动更新(通过API);
非常感谢!!!

附:个人实现的宝塔获取站点列表、上传证书、重启Nginx代码,希望能帮上忙:

# -*- coding: utf-8 -*-
import hashlib
import time
import requests
import prettytable


# 宝塔面板操作类
class BtPanel:
    __BTURL = ''
    __APIKEY = ''
    __REQ = requests.session()

    # 初始化宝塔面板
    def __init__(self, host: str, apisk: str):
        '''
        :param host: 宝塔面板地址(末尾不加/)
        :param apisk: 宝塔面板API密钥
        '''
        self.__BTURL = host
        self.__APIKEY = apisk

    # 计算MD5
    def __GetMD5(self, s: str):
        '''
        计算字符串的MD5值
        :param s: 待计算的字符串
        :return: MD5值
        '''
        m = hashlib.md5()
        m.update(s.encode('utf-8'))
        return m.hexdigest()

    # 签名计算
    def __GetToken(self):
        request_time = int(time.time())    # 获取请求时间戳
        request_token = self.__GetMD5(str(request_time) + '' + self.__GetMD5(self.__APIKEY)),   # 生成请求签名
        return {'request_time': request_time, 'request_token': request_token}

    # 获取站点列表
    def GetSites(self, showlog=True):
        '''
        获取宝塔面板站点列表
        :showlog: 是否输出结果
        :return: 站点列表数据
        '''
        if showlog:
            print('\n### 获取站点列表...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'p': 1,
            'limit': 100,
            'order': 'id'
        }
        res = self.__REQ.post(url=self.__BTURL + '/data?action=getData&table=sites', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 获取站点列表失败:', res['msg'])
            return False
        # 使用prettytable输出站点列表
        if showlog:
            tb = prettytable.PrettyTable()
            tb.field_names = ['网站名', '站点类型', '备注', 'SSL域名', 'SSL到期时间', 'SSL剩余天数', '证书品牌', '状态']
            tb.align['网站名'] = 'l'
            tb.align['备注'] = 'l'
            tb.align['SSL域名'] = 'l'
            tb.align['证书品牌'] = 'l'
            for site in res['data']:
                site_status = '运行' if site['status'] == '1' else '停止'
                if site['ssl'] == -1:
                    sslinfo = {'notAfter': '-', 'endtime': '-', 'subject': '-', 'issuer': '-'}
                else:
                    sslinfo = {'notAfter': site['ssl']['notAfter'], 'endtime': site['ssl']['endtime'], 'subject': site['ssl']['subject'], 'issuer': site['ssl']['issuer']}
                tb.add_row([site['name'], site['project_type'], site['ps'], sslinfo['subject'], sslinfo['notAfter'], sslinfo['endtime'], sslinfo['issuer'], site_status])
            print(tb)
        return res['data']

    # 设置站点SSL证书
    def SetSSL(self, site_name: str, ssl_cert_content: str, ssl_key_content: str):
        '''
        设置站点SSL证书
        :param site_name: 站点名称(域名)
        :param ssl_cert_content: ssl证书内容
        :param ssl_key_content: ssl私钥内容
        :return: 设置结果
        '''
        print('\n### 设置站点SSL证书...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'type': 0,
            'siteName': site_name,
            'key': ssl_key_content,
            'csr': ssl_cert_content
        }
        res = self.__REQ.post(url=self.__BTURL + '/site?action=SetSSL', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 设置站点SSL证书失败:', res['msg'])
            return False
        print('>>>', res['msg'])    # 输出结果
        return res['status']

    # 获取证书夹列表
    def GetCertList(self, showlog=True):
        '''
        获取证书夹列表
        :showlog: 是否输出结果
        :return: 证书夹列表数据
        '''
        if showlog:
            print('\n### 获取证书夹列表...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'force_refresh': 0    # 0:获取本地证书 1:获取云端证书
        }
        res = self.__REQ.post(url=self.__BTURL + '/ssl?action=get_cert_list', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 获取证书夹列表失败:', res['msg'])
            return False
        # 使用prettytable输出证书夹列表
        if showlog:
            tb = prettytable.PrettyTable()
            tb.field_names = ['域名', 'SSL到期时间', 'SSL剩余天数', '证书品牌', '可选域名']
            tb.align['域名'] = 'l'
            tb.align['证书品牌'] = 'l'
            tb.align['可选域名'] = 'l'
            for cert in res:
                tb.add_row([cert['subject'], cert['info']['notAfter'], cert['endtime'], cert['info']['issuer'], cert['dns']])
            print(tb)
        return res

    # 删除过期SSL证书
    def DelExpiredSSL(self):
        '''
        删除过期SSL证书
        :showlog: 是否输出结果
        :return: 删除结果
        '''
        print('\n### 删除过期SSL证书...')
        certs = self.GetCertList(showlog=False)    # 获取证书列表
        if not certs:
            return False
        expiredcerts = []    # 过期证书列表
        for cert in certs:
            if cert['endtime'] < 0:    # 证书已过期
                expiredcerts.append(cert['subject'])    # 加入过期证书列表
                tk = self.__GetToken()    # 获取签名
                playload = {
                    'request_time': tk['request_time'],
                    'request_token': tk['request_token'],
                    'local': 1,
                    'ssl_hash': cert['hash']
                }
                res = self.__REQ.post(url=self.__BTURL + '/ssl?action=remove_cloud_cert', data=playload).json()
                # 判断请求是否成功
                if 'status' in res and 'msg' in res and not res['status']:
                    print('>>> 删除过期SSL证书失败:', res['msg'])
                    return False
                print('>>> 过期证书', cert['subject'], res['msg'])
                return res['status']
        if len(expiredcerts) == 0:
            print('>>> 证书夹内未发现过期证书')
        return expiredcerts

    # 重启Nginx
    def RestartNginx(self):
        '''
        重启Nginx
        :return: 重启结果
        '''
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'name': 'nginx',
            'type': 'restart'
        }
        res = self.__REQ.post(url=self.__BTURL + '/system?action=ServiceAdmin', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 重启Nginx失败:', res['msg'])
            return False
        return res.json()['status']

    # 获取系统基本信息
    def GetSystemInfo(self, showlog=True):
        '''
        获取系统基本信息
        :showlog: 是否输出结果
        :return: 系统基本信息
        '''
        if showlog:
            print('\n### 获取系统基本信息...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token']
        }
        res = self.__REQ.post(url=self.__BTURL + '/system?action=GetSystemTotal', data=playload).json()
        # 判断请求是否成功
        # 判断res是否存在status字段,若不存在则说明请求失败
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 获取系统基本信息失败:', res['msg'])
            return False
        res['memUtilization'] = round(res['memRealUsed'] / res['memTotal'], 2) * 100    # 内存使用率
        if showlog:
            print('>>> 面板地址:', self.__BTURL)
            print('>>> 操作系统:', res['system'])
            print('>>> 面板版本:', res['version'])
            print('>>> 运行时间:', res['time'])
            print('>>> CPU使用率:', res['cpuRealUsed'], '%')
            print('>>> 内存使用率:', res['memUtilization'], '%')
            print('>>> 内存总量:', round(res['memTotal'] / 1024, 2), 'GB')
        return res

上述代码已经过测试可行。

@mouday
Copy link
Owner

mouday commented Jun 27, 2024

好的,我研究下

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants