Skip to content

系统使用 think-queue 作为消息队列组件

消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送,可进行发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等操作。

队列配置

消息队列配置文件存放在config/queue.php文件内,建议使用redis作为消息队列驱动

配置文件

<?php

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', ''),
            'port'       => env('redis.port', 6379),
            'password'   => env('redis.password', ''),
            'select'     => env('redis.select', 0),
            'timeout'    => env('redis.timeout', 0),
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

系统自带队列

// 小程序模版消息队列
app/common/jobs/notice/AppletTemplateJob.php

// 发送邮件队列
app/common/jobs/notice/MailJob.php

// 发送短信队列
app/common/jobs/notice/SmsJob.php

// 发送微信模板消息队列
app/common/jobs/notice/WechatTemplateJob.php

// 导出excel队列
app/common/jobs/ExportJob.php

添加新队列

队列文件存放在app/common/jobs目录下。
继承框架核心BaseJob类,必须实现doJob和notify方法,可设置队列相关信息。
使用JobTrait文件内置方法,可实现添加队列、定时队列和延时队列。
可调用QueueService文件的addQueueLog方法添加队列日志。

BaseJob内置方法,文件位置:tmcore/base/BaseJob.php

/**
 * 执行任务(默认执行)
 * @param Job $job 队列对象
 * @param array $data queue-任务名 data-传递参数 retry_num-重试次数
 */
public function fire(Job $job, $data):void

/**
 * 执行成功
 * @param array $data
 * @param string $msg
 * @param int $code
 * @return array
 */
public function success($data = [], $msg = 'success', $code = 0)

/**
 * 执行失败
 * @param string $msg
 * @param int $code
 * @return array
 */
public function error($msg = '', $code = -1)

JobTrait内置方法,文件位置:tmcore/traits/JobTrait.php,静态调用方法

/**
 * 添加队列
 * @param array|string $data 数据
 * @param string $queue_name 队列名称
 * @param string $class_name 执行类名
 * @return string 队列job_id
 */
public static function pushJob($data, $queue_name = null, $class_name = '')

/**
* 添加定时队列
* @param int $timeStamp 执行的时间戳
* @param array|string $data 数据
* @param string $queue_name 队列名称
* @param string $class_name 执行类名
* @return string 队列job_id
*/
public static function pushTimeJob($timeStamp, $data, $queue_name = null, $class_name = '')

/**
 * 添加延时队列
 * @param integer $delay 延时时间 秒
 * @param array|string $data 数据
 * @param string $queue_name 队列名称
 * @param string $class_name 执行类名
 * @return string 队列job_id
 */
public static function pushDelayJob($delay, $data, $queue_name = null, $class_name = '')

以新增OrderJob队列文件为例

<?php

namespace app\common\jobs;

use tmcore\services\QueueService;
use think\facade\Log;
use tmcore\base\BaseJob;
use tmcore\traits\JobTrait;

/**
 * 订单队列
 */
class OrderJob extends BaseJob
{
    use JobTrait;

    public function doJob($data) {
        try {
            // 执行任务

            return $this->success();
        } catch (\Exception $e) {
            return $this->error('执行任务失败,' . ' 行号:' . $e->getLine() . '文件:' . $e->getFile() . '错误信息:' . $e->getMessage());
        }
    }

    public function notify($data) {
        try {
            // 添加消息队列日志
            (new QueueService)->addQueueLog($data);
        } catch (\Exception $e) {
            Log::error('执行任务失败,' . ' 行号:' . $e->getLine() . '文件:' . $e->getFile() . '错误信息:' . $e->getMessage());
        }
    }
}

调用示例

以调用OrderJob队列为例

use app\common\jobs\OrderJob;

// 添加队列,立即执行
OrderJob::pushJob([
    'job' => 'orderJob1',
    'data' => [
        'order_no' => '9527418306360028'
    ]
]);

$time = time() + 3600;
// 一小时后执行
OrderJob::pushTimeJob(time, [
    'job' => 'orderJob2',
    'retry_num' => 3, // 重试次数
    'data' => [
        'order_no' => '9527418306360028'
    ]
]);

// 延时30秒执行
OrderJob::pushDelayJob(30, [
    'job' => 'orderJob3',
    'data' => [
        'order_no' => '9527418306360028'
    ]
]);

Released under the Apache-2.0 License.