Skip to content

Latest commit

 

History

History
436 lines (361 loc) · 14.1 KB

7.基于Swoole发送邮件.md

File metadata and controls

436 lines (361 loc) · 14.1 KB

基于Swoole发送邮件

主程序

在目录:src/App/下建立Mail.php,用作Swoole服务端主程序。

<?php 
namespace Helloweba\Swoole;

use swoole_server;
use PHPMailer\PHPMailer\PHPMailer;
use PHPMailer\PHPMailer\Exception;

class Mail
{
    protected $serv;
    protected $host = '127.0.0.1';
    protected $port = 9502;
    // 进程名称
    protected $taskName = 'swooleMailer';
    // PID路径
    protected $pidPath = '/run/swooleMail.pid';
    // 设置运行时参数
    protected $options = [
        'worker_num' => 4, //worker进程数,一般设置为CPU数的1-4倍  
        'daemonize' => true, //启用守护进程
        'log_file' => '/data/logs/swoole.log', //指定swoole错误日志文件
        'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
        'dispatch_mode' => 1, //数据包分发策略,1-轮询模式
        'task_worker_num' => 4, //task进程的数量
        'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式
        //'heartbeat_idle_time' => 600, //一个连接如果600秒内未向服务器发送任何数据,此连接将被强制关闭
        //'heartbeat_check_interval' => 60, //启用心跳检测,每隔60s轮循一次
    ];
    // 邮件服务器配置
    protected $mailConfig = [
        'smtp_server' => 'smtp.163.com',
        'username' => '[email protected]',
        'password' => '',// SMTP 密码/口令
        'secure' => 'ssl', //Enable TLS encryption, `ssl` also accepted
        'port' => 465, // tcp邮件服务器端口
    ];
    // 安全密钥
    protected $safeKey = 'MYgGnQE33ytd2jDFADS39DSEWsdD24sK';


    public function __construct($mailConfig, $options = [])
    {
        // 构建Server对象,监听端口
        $this->serv = new swoole_server($this->host, $this->port);

        if (!empty($options)) {
            $this->options = array_merge($this->options, $options);
        }
        $this->serv->set($this->options);

        $this->mailConfig = $mailConfig;

        // 注册事件
        $this->serv->on('Start', [$this, 'onStart']);
        $this->serv->on('Connect', [$this, 'onConnect']);
        $this->serv->on('Receive', [$this, 'onReceive']);
        $this->serv->on('Task', [$this, 'onTask']);  
        $this->serv->on('Finish', [$this, 'onFinish']);
        $this->serv->on('Close', [$this, 'onClose']);

        // 启动服务
        //$this->serv->start();
    }

    protected function init()
    {
        //
    }

    public function start()
    {
        // Run worker
        $this->serv->start();
    }

    public function onStart($serv)
    {
        // 设置进程名
        cli_set_process_title($this->taskName);
        //记录进程id,脚本实现自动重启
        $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
        file_put_contents($this->pidPath, $pid);
    }

    //监听连接进入事件
    public function onConnect($serv, $fd, $from_id)
    {
        $serv->send($fd, "Hello {$fd}!" );
    }

    // 监听数据接收事件
    public function onReceive(swoole_server $serv, $fd, $from_id, $data)
    {
        $res['result'] = 'failed';
        $key = $this->safeKey;

        $req = json_decode($data, true);
        $action = $req['action'];
        $token = $req['token'];
        $timestamp = $req['timestamp'];

        if (time() - $timestamp > 180) {
            $res['code'] = '已超时';
            $serv->send($fd, json_encode($res));
            exit;
        }

        $token_get = md5($action.$timestamp.$key);
        if ($token != $token_get) {
            $res['msg'] = '非法提交';
            $serv->send($fd, json_encode($res));
            exit;
        }

        $res['result'] = 'success';
        $serv->send($fd, json_encode($res)); // 同步返回消息给客户端
        $serv->task($data);  // 执行异步任务

    }

    /**
    * @param $serv swoole_server swoole_server对象
    * @param $task_id int 任务id
    * @param $from_id int 投递任务的worker_id
    * @param $data string 投递的数据
    */
    public function onTask(swoole_server $serv, $task_id, $from_id, $data)
    {
        $res['result'] = 'failed';
        $req = json_decode($data, true);
        $action = $req['action'];
        echo date('Y-m-d H:i:s')." onTask: [".$action."].\n";
        switch ($action) {
            case 'sendMail': //发送单个邮件
                $mailData = [
                    'emailAddress' => '[email protected]', //接收方,改成自己的邮箱可以测试接收邮件
                    'subject' => 'swoole实验室',
                    'body' => '测试This is the HTML message body <b>in bold!</b>,<br/>欢迎访问<a href="https://www.helloweba.net/">www.helloweba.net</a>',
                    'attach' => '/home/swoole/public/a.jpg'
                ];
                $this->sendMail($mailData);
                break;
            
            default:
                break;
        }
    }


    /**
    * @param $serv swoole_server swoole_server对象
    * @param $task_id int 任务id
    * @param $data string 任务返回的数据
    */
    public function onFinish(swoole_server $serv, $task_id, $data)
    {
        //
    }


    // 监听连接关闭事件
    public function onClose($serv, $fd, $from_id) {
        echo "Client {$fd} close connection\n";
    }

    public function stop()
    {
        $this->serv->stop();
    }

    private function sendMail($mail_data = [])
    {
        $mail = new PHPMailer(true);
        try {
            $mailConfig = $this->mailConfig;
            //$mail->SMTPDebug = 2;        // 启用Debug
            $mail->isSMTP();   // Set mailer to use SMTP
            $mail->Host = $mailConfig['smtp_server'];  // SMTP服务
            $mail->SMTPAuth = true;                  // Enable SMTP authentication
            $mail->Username = $mailConfig['username'];    // SMTP 用户名
            $mail->Password = $mailConfig['password'];     // SMTP 密码/口令
            $mail->SMTPSecure = $mailConfig['secure'];     // Enable TLS encryption, `ssl` also accepted
            $mail->Port = $mailConfig['port'];    // TCP 端口
            $mail->CharSet  = "UTF-8"; //字符集
            $mail->Encoding = "base64"; //编码方式
            //Recipients
            $mail->setFrom($mailConfig['username'], 'Helloweba'); //发件人地址,名称
            $mail->addAddress($mail_data['emailAddress'], '亲');     // 收件人地址和名称
            //$mail->addCC('[email protected]'); // 抄送
            //Attachments
            if (isset($mail_data['attach'])) {
                $mail->addAttachment($mail_data['attach']);         // 添加附件
            } 
            //$mail->addAttachment('/tmp/image.jpg', 'new.jpg');    // Optional name
            //Content
            $mail->isHTML(true);                                  // Set email format to HTML
            $mail->Subject = $mail_data['subject'];
            $mail->Body    = $mail_data['body'];

            $mail->send();
            return true;
        } catch (\Exception $e) {
            echo 'Message could not be sent. Mailer Error: '. $mail->ErrorInfo;
            return false;
        }
    }
}

​ Swoole启动后,服务端会监听数据接收事件onReceive(),当接收到客户端发来的数据时会进行相应的处理。我们在这里对源数据进行验证,然后作为任务投递给onTask()sendMail()是使用phpmailler来发送邮件的

运行服务端

​ 在public/目录下建立mailServer.php,代码如下:

<?php 
require dirname(__DIR__) . '/vendor/autoload.php';

use Helloweba\Swoole\Mail;

$config = [
    'smtp_server' => 'smtp.163.com', //邮件服务器
    'username' => '[email protected]', //这里是用作发送方的邮箱号
    'password' => 'xxxxx',// SMTP 密码/口令
    'secure' => 'ssl', //Enable TLS encryption, `ssl` also accepted
    'port' => 465, // tcp邮件服务器端口
];
$server = new Mail($config);
$server->start();

​ 你可以注册一个163邮箱,然后开通smtp功能。我DEMO中使用的是163邮箱发邮件发多了被封号了,所以在线演示demo没上传了。配置好邮件服务器参数后,运行:

php mailServer.php

运行客户端

​ 在public/目录下新建mailClient.php,代码如下:

<?php 
class Client
{
    private $client;
    
    public function __construct() {
        $this->client = new swoole_client(SWOOLE_SOCK_TCP);
    }

    public function connect() {
        if( !$this->client->connect("127.0.0.1", 9502 , 1) ) {
            echo "Error: {$this->client->errMsg}[{$this->client->errCode}]\n";
        }
        
        $action = 'sendMail';
        $time = time();
        $key = 'MYgGnQE33ytd2jDFADS39DSEWsdD24sK';
        $token = md5($action.$time.$key);
        $data = [
            'action' => $action,
            'token' => $token,
            'timestamp' => $time
        ];
        $msg = json_encode($data);

        $this->client->send( $msg );
        $message = $this->client->recv();
        echo "Get Message From Server:{$message}\n";
    }
}

$client = new Client();
$client->connect();

运行命令启动客户端:

php mailClient.php

此时在命令行窗口会返回如下信息:

[root@localhost public]# php mailClient.php 
Get Message From Server:{"result":"success"}

这样就已经执行邮件发送任务

利用队列实现批量发送邮件

​ 编辑src/App/Mail.php文件代码,在public function onTask()方法中增加批量队列发送邮件的代码:

 public function onTask(swoole_server $serv, $task_id, $from_id, $data)
 {
        $res['result'] = 'failed';
        $req = json_decode($data, true);
        $action = $req['action'];
        echo date('Y-m-d H:i:s')." onTask: [".$action."].\n";
        switch ($action) {
            case 'sendMail': //发送单个邮件
                $mailData = [
                    'emailAddress' => '[email protected]', //接收方,改成自己的邮箱可以测试接收邮件
                    'subject' => 'swoole实验室',
                    'body' => '测试This is the HTML message body.',
                    'attach' => '/home/swoole/public/a.jpg'
                ];
                $this->sendMail($mailData);
                break;

            case 'sendMailQueue': // 批量队列发送邮件
                $this->sendMailQueue();
                break;
            
            default:
                break;
        }
 }

​ 由于发送的邮件比较多,我们把邮件列表事先保存在Redis队列中。我们知道Redis的使用场景很多,其中就可以用它来做简单的队列。

​ 我们在任务中调用了sendMailQueue()方法,继续在Mail.php中添加:

 // 邮件发送队列
    private function sendMailQueue()
    {
        $redis = new \Redis();
        $redis->connect('127.0.0.1', 6379);
       
        $password = '123';
        $redis->auth($password);

        swoole_timer_tick(1000, function($timer) use ($redis) { // 启用定时器,每1秒执行一次
            $value = $redis->lpop('mailerlist');
            if($value){
                //echo '获取redis数据:' . $value;
                $json = json_decode($value, true);
                $start = microtime(true);
                $rs = $this->sendMail($json);
                $end = microtime(true);
                if ($rs) {
                    echo '发送成功!'.$value.', 耗时:'. round($end - $start, 3).'秒'.PHP_EOL;
                } else { // 把发送失败的加入到失败队列中,人工处理
                    $redis->rpush("mailerlist_err", $value);
                }
            }else{
                swoole_timer_clear($timer); // 停止定时器
                echo "Emaillist出队完成";
            }
        });
    }

​ 上述代码中,先尝试连接Redis,然后使用Swoole的swoole_timer_tick()函数,它是个定时器,这个函数跟js的interval()函数一样,意思是每隔一定时间执行一次,它可以定义毫秒级粒度。很显然,上述代码中,每隔1000毫秒(1秒)从Redis队列mailerlist中取出一条,即一个邮件对象,然后执行发送邮件sendMail(),当发送完所有邮件后,使用swoole_timer_clear()关闭定时器即可。定时器的间隔时间可以调整。

​ 在客户端,我们先往Redis队列里添加邮件内容,然后向服务端发起sendMailQueue批量发邮件指令。

<?php 
class Client
{
    private $client;
    
    public function __construct() {
        $this->client = new swoole_client(SWOOLE_SOCK_TCP);
    }

    public function connect() {
        if( !$this->client->connect("127.0.0.1", 9502 , 1) ) {
            echo "Error: {$this->client->errMsg}[{$this->client->errCode}]\n";
        }
        
        $action = 'sendMailQueue';
        $time = time();
        $key = 'MYgGnQE33ytd2jDFADS39DSEWsdD24sK';
        $token = md5($action.$time.$key);
        $data = [
            'action' => $action,
            'token' => $token,
            'timestamp' => $time
        ];
        $msg = json_encode($data);

        $this->client->send( $msg );
        $message = $this->client->recv();
        echo "Get Message From Server:{$message}\n";
    }
}

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

$password = '123';
$redis->auth($password);

$arr = [];

$arr[0] = [
    'subject' => '国庆大酬宾,全场1折',
    'emailAddress' => '[email protected]',
    'body' => '您好,国庆期间大酬宾,全场所有商品统统1折甩卖。'
];
$arr[1] = [
    'subject' => '注册会员送100金币',
    'emailAddress' => '[email protected]',
    'body' => '邮件内容'
];
$arr[2] = [
    'subject' => '国庆大酬宾,全场1折',
    'emailAddress' => '[email protected]',
    'body' => '邮件内容2'
];
      
foreach ($arr as $k=>$v) {
    $redis->rpush("mailerlist", json_encode($v, JSON_UNESCAPED_UNICODE));
}

$client = new Client();
$client->connect();

​ 本文中使用了redis作为简单队列,你也可以使用复杂点的队列rabbitmq。你也可以使用Crontab来做定时任务,不过它最小粒度是分钟级别的。