教你使用mixphp打造多進(jìn)程異步郵件發(fā)送

注意:這個(gè)是 MixPHP V1 的范例

郵件發(fā)送是很常見的需求,由于發(fā)送郵件的操作一般是比較耗時(shí)的,所以我們一般采用異步處理來提升用戶體驗(yàn),而異步通常我們使用消息隊(duì)列來實(shí)現(xiàn)。

成都創(chuàng)新互聯(lián)長(zhǎng)期為上千余家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為麻栗坡企業(yè)提供專業(yè)的成都做網(wǎng)站、成都網(wǎng)站建設(shè)、成都外貿(mào)網(wǎng)站建設(shè),麻栗坡網(wǎng)站改版等技術(shù)服務(wù)。擁有十余年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。

傳統(tǒng) MVC 框架由于缺少多進(jìn)程開發(fā)能力,通常是采用同一個(gè)腳本執(zhí)行多次,產(chǎn)生多個(gè)進(jìn)程的方式,mixphp 封裝了 TaskExecutor 專用于多進(jìn)程開發(fā),用戶能非常簡(jiǎn)單的開發(fā)出功能完善的高可用多進(jìn)程應(yīng)用。

推薦:《PHP視頻教程》

下面演示一個(gè)異步郵件發(fā)送系統(tǒng)的開發(fā)過程,涉及知識(shí)點(diǎn):

異步消息隊(duì)列多進(jìn)程守護(hù)進(jìn)程如何使用消息隊(duì)列實(shí)現(xiàn)異步

PHP 使用消息隊(duì)列通常是使用中間件來實(shí)現(xiàn),常用的消息中間件有:

redisrabbitmqkafka

本次我們選用 redis 來實(shí)現(xiàn)異步郵件發(fā)送,redis 的數(shù)據(jù)類型中有一個(gè) list 類型,可實(shí)現(xiàn)消息隊(duì)列,使用以下命令:

// 入列
$redis->lpush($key, $data);
// 出列
$data = $redis->rpop($key);
// 阻塞出列
$data = $redis->brpop($key, 10);

架構(gòu)設(shè)計(jì)

本實(shí)例由傳統(tǒng) MVC 框架投遞郵件發(fā)送需求,MixPHP 多進(jìn)程執(zhí)行發(fā)送任務(wù)。

郵件發(fā)送庫(kù)選型

以往我們通常使用框架提供的郵件發(fā)送庫(kù),或者網(wǎng)上下載別的用戶分享的庫(kù),composer 出現(xiàn)后,https://packagist.org/ 上有大量?jī)?yōu)質(zhì)的庫(kù),我們只需選擇一個(gè)最好的即可,本例選擇 swiftmailer。

由于發(fā)送任務(wù)是由 MixPHP 執(zhí)行,所以 swiftmailer 是安裝在 MixPHP 項(xiàng)目中,在項(xiàng)目根目錄中執(zhí)行以下命令安裝:

composer require swiftmailer/swiftmailer

生產(chǎn)者開發(fā)

在郵件發(fā)送這個(gè)需求中生產(chǎn)者是指投遞發(fā)送任務(wù)的一方,這一方通常是一個(gè)接口或網(wǎng)頁(yè),這個(gè)部分并不一定需 mixphp 開發(fā),TP、CI、YII 這些都可以,只需在接口或網(wǎng)頁(yè)中把任務(wù)信息投遞到消息隊(duì)列中即可。

在傳統(tǒng) MVC 框架的控制器中增加如下代碼:

通??蚣苤惺褂?redis 會(huì)安裝一個(gè)類庫(kù)來使用,本例使用原生代碼,便于理解。
// 連接
$redis = new \\Redis();
if (!$redis->connect('127.0.0.1', 6379)) {
    throw new \\Exception('Redis Connect Failure');
}
$redis->auth('');
$redis->select(0);
// 投遞任務(wù)
$data = [
    'to'      => ['***@qq.com' => 'A name'],
    'body'    => 'Here is the message itself',
    'subject' => 'The title content',
];
$redis->lpush('queue:email', serialize($data));

通常異步開發(fā)中,投遞完成后就會(huì)立即響應(yīng)一個(gè)消息給用戶,當(dāng)然此時(shí)該任務(wù)并沒有執(zhí)行。

消費(fèi)者開發(fā)

本例我們使用 MixPHP 的多進(jìn)程開發(fā)工具 TaskExecutor 來完成這個(gè)需求,通常使用常駐進(jìn)程來處理隊(duì)列的消費(fèi),所以我們使用 TaskExecutor 的 TYPE_DAEMON 類型,MODE_PUSH 模式。

TaskExecutor 的 MODE_PUSH 模式有二種進(jìn)程:

左進(jìn)程:負(fù)責(zé)從消息隊(duì)列取出任務(wù)數(shù)據(jù),投放給中進(jìn)程。

中進(jìn)程:負(fù)責(zé)執(zhí)行郵件發(fā)送任務(wù)。

PushCommand.php 代碼如下:

<?php

namespace apps\\daemon\\commands;

use mix\\console\\ExitCode;
use mix\\facades\\Input;
use mix\\facades\\Redis;
use mix\\task\\CenterProcess;
use mix\\task\\LeftProcess;
use mix\\task\\TaskExecutor;

/**
 * 推送模式范例
 * @author 劉健 <coder.liu@qq.com>
 */
class PushCommand extends BaseCommand
{

    // 配置信息
    const HOST = 'smtpdm.aliyun.com';
    const PORT = 465;
    const SECURITY = 'ssl';
    const USERNAME = '****@email.***.com';
    const PASSWORD = '****';

    // 初始化事件
    public function onInitialize()
    {
        parent::onInitialize(); // TODO: Change the autogenerated stub
        // 獲取程序名稱
        $this->programName = Input::getCommandName();
        // 設(shè)置pidfile
        $this->pidFile = "/var/run/{$this->programName}.pid";
    }

    /**
     * 獲取服務(wù)
     * @return TaskExecutor
     */
    public function getTaskService()
    {
        return create_object(
            [
                // 類路徑
                'class'         => 'mix\\task\\TaskExecutor',
                // 服務(wù)名稱
                'name'          => "mix-daemon: {$this->programName}",
                // 執(zhí)行類型
                'type'          => \\mix\\task\\TaskExecutor::TYPE_DAEMON,
                // 執(zhí)行模式
                'mode'          => \\mix\\task\\TaskExecutor::MODE_PUSH,
                // 左進(jìn)程數(shù)
                'leftProcess'   => 1,
                // 中進(jìn)程數(shù)
                'centerProcess' => 5,
                // 任務(wù)超時(shí)時(shí)間 (秒)
                'timeout'       => 5,
            ]
        );
    }

    // 啟動(dòng)
    public function actionStart()
    {
        // 預(yù)處理
        if (!parent::actionStart()) {
            return ExitCode::UNSPECIFIED_ERROR;
        }
        // 啟動(dòng)服務(wù)
        $service = $this->getTaskService();
        $service->on('LeftStart', [$this, 'onLeftStart']);
        $service->on('CenterStart', [$this, 'onCenterStart']);
        $service->start();
        // 返回退出碼
        return ExitCode::OK;
    }

    // 左進(jìn)程啟動(dòng)事件回調(diào)函數(shù)
    public function onLeftStart(LeftProcess $worker)
    {
        try {
            // 模型內(nèi)使用長(zhǎng)連接版本的數(shù)據(jù)庫(kù)組件,這樣組件會(huì)自動(dòng)幫你維護(hù)連接不斷線
            $queueModel = Redis::getInstance();
            // 保持任務(wù)執(zhí)行狀態(tài),循環(huán)結(jié)束后當(dāng)前進(jìn)程會(huì)退出,主進(jìn)程會(huì)重啟一個(gè)新進(jìn)程繼續(xù)執(zhí)行任務(wù),這樣做是為了避免長(zhǎng)時(shí)間執(zhí)行內(nèi)存溢出
            for ($j = 0; $j < 16000; $j++) {
                // 從消息隊(duì)列中間件阻塞獲取一條消息
                $data = $queueModel->brpop('queue:email', 10);
                if (empty($data)) {
                    continue;
                }
                list(, $data) = $data;
                // 將消息推送給中進(jìn)程去處理,push有長(zhǎng)度限制 (https://wiki.swoole.com/wiki/page/290.html)
                $worker->push($data, false);
            }
        } catch (\\Exception $e) {
            // 休息一會(huì),避免 CPU 出現(xiàn) 100%
            sleep(1);
            // 拋出錯(cuò)誤
            throw $e;
        }
    }

    // 中進(jìn)程啟動(dòng)事件回調(diào)函數(shù)
    public function onCenterStart(CenterProcess $worker)
    {
        // 保持任務(wù)執(zhí)行狀態(tài),循環(huán)結(jié)束后當(dāng)前進(jìn)程會(huì)退出,主進(jìn)程會(huì)重啟一個(gè)新進(jìn)程繼續(xù)執(zhí)行任務(wù),這樣做是為了避免長(zhǎng)時(shí)間執(zhí)行內(nèi)存溢出
        for ($j = 0; $j < 16000; $j++) {
            // 從進(jìn)程消息隊(duì)列中搶占一條消息
            $data = $worker->pop();
            if (empty($data)) {
                continue;
            }
            // 處理消息
            try {
                // 處理消息,比如:發(fā)送短信、發(fā)送郵件、微信推送
                var_dump($data);
                $ret = self::sendEmail($data);
                var_dump($ret);
            } catch (\\Exception $e) {
                // 回退數(shù)據(jù)到消息隊(duì)列
                $worker->rollback($data);
                // 休息一會(huì),避免 CPU 出現(xiàn) 100%
                sleep(1);
                // 拋出錯(cuò)誤
                throw $e;
            }
        }
    }

    // 發(fā)送郵件
    public static function sendEmail($data)
    {
        // Create the Transport
        $transport = (new \\Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
            ->setUsername(self::USERNAME)
            ->setPassword(self::PASSWORD);
        // Create the Mailer using your created Transport
        $mailer = new \\Swift_Mailer($transport);
        // Create a message
        $message = (new \\Swift_Message($data['subject']))
            ->setFrom([self::USERNAME => '**網(wǎng)'])
            ->setTo($data['to'])
            ->setBody($data['body']);
        // Send the message
        $result = $mailer->send($message);
        return $result;
    }

}

測(cè)試

1.在 shell 中啟動(dòng) push 常駐程序。
[root@localhost bin]# ./mix-daemon push start
mix-daemon 'push' start successed.
1.調(diào)用接口往消息隊(duì)列投放任務(wù)。

此時(shí) shell 終端將打?。?/p>

成功收到測(cè)試郵件:

MixPHP

GitHub: https://github.com/mix-php/mix
官網(wǎng):http://www.mixphp.cn/

標(biāo)題名稱:教你使用mixphp打造多進(jìn)程異步郵件發(fā)送
文章分享:http://muchs.cn/article40/cgpheo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供ChatGPT、外貿(mào)網(wǎng)站建設(shè)、微信小程序服務(wù)器托管、網(wǎng)站設(shè)計(jì)公司全網(wǎng)營(yíng)銷推廣

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)