Skip to content

php笔记(一):tp6消息队列think-queue

图片

目前正在做一个客服系统,作为一个前端开发者,没有怎么用过消息队列,所以了解了一下

消息队列和前面写过的一个vue跨组件的通信类非常相似,用的发布订阅模式实现

think-queue是tp官方提供的一个消息队列服务,使用也非常方便

官方地址:https://github.com/top-think/think-queue

安装

php
composer require topthink/think-queue
composer require topthink/think-queue

安装成功后,会在config配置文件夹下自动生成配置文件queue.php

图片

打开配置文件

php
return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => '127.0.0.1',
            'port'       => 6379,
            'password'   => '',
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];
return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => '127.0.0.1',
            'port'       => 6379,
            'password'   => '',
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

因为我这边是用的redis,所以修改默认使用redis驱动

开始使用

app文件夹下新建job文件夹,里面放的是用来执行消息队列的文件

图片

我这是一个客服系统,所以,job文件夹下,再创建一个chat.php文件,用来执行消息推送任务

php
namespace app\job;

use think\queue\Job;

class Job{
    public function fire(Job $job, $data){
             //....这里执行具体的任务 
            $jobRes = $this->doChat($data);
            //判断是否执行成功
            if($jobRes){
                 //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
                $job->delete();
                return;
            }else{
                echo "推送失败";
                //检查任务重试次数,超过3次未成功,删除放弃
                if($job->attempts() > 3){
                    $job->delete();
                }
          }
          
    }
    
    public function failed($data){
    
        // ...任务达到最大重试次数后,失败了
    }
    // 消息队列执行方法
    public function doChat($data)
{
        //具体执行的任务
         $sendRes = send($data);
        // 这里的判断条件以具体业务是否执行成功进行判断
        if($sendRes){
           return true;
        }else{
           return false;
        }
    }
}
namespace app\job;

use think\queue\Job;

class Job{
    public function fire(Job $job, $data){
             //....这里执行具体的任务 
            $jobRes = $this->doChat($data);
            //判断是否执行成功
            if($jobRes){
                 //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
                $job->delete();
                return;
            }else{
                echo "推送失败";
                //检查任务重试次数,超过3次未成功,删除放弃
                if($job->attempts() > 3){
                    $job->delete();
                }
          }
          
    }
    
    public function failed($data){
    
        // ...任务达到最大重试次数后,失败了
    }
    // 消息队列执行方法
    public function doChat($data)
{
        //具体执行的任务
         $sendRes = send($data);
        // 这里的判断条件以具体业务是否执行成功进行判断
        if($sendRes){
           return true;
        }else{
           return false;
        }
    }
}

现在再建一个发布任务的文件Queue.php文件,放在你具体的项目下,比如我:要放到kefu项目目录下

图片

官方文档

图片

然后打开Queue文件,开始写代码

php
<?php 
//命名空间一定要写对
namespace app\kefu;
//引用Queue,因为和当前类重名了,所以起了一个别名
use think\facade\Queue as Queues;

class Queue{
    //具体执行的方法,改成静态方法,方便调用
    public static function chatHandle($data){
        //jobDoChat是之前创建的job文件夹下Chat类的路径@后面跟的是具体执行的方法
        $jobDoChat = "app\job\Chat@fire";
        //消息队列的名字,
        $jobQueueName = 'chat_job_queue';
        //Queue::push是将job执行任务的方法和传过来数据以及消息队列名字一起添加到消息队列,push是立即执行还有一个延迟执行later
        $isPushed = Queues::push($jobDoChat,$data,$jobQueueName);
        //然后判断是否加入消息队列成功
        if ($isPushed !== false) {
            // 成功之后的业务
            echo '队列加入成功';
        } else {
            // 失败之后的业务
            echo '队列加入失败';
        }

    }
}
 ?>
<?php 
//命名空间一定要写对
namespace app\kefu;
//引用Queue,因为和当前类重名了,所以起了一个别名
use think\facade\Queue as Queues;

class Queue{
    //具体执行的方法,改成静态方法,方便调用
    public static function chatHandle($data){
        //jobDoChat是之前创建的job文件夹下Chat类的路径@后面跟的是具体执行的方法
        $jobDoChat = "app\job\Chat@fire";
        //消息队列的名字,
        $jobQueueName = 'chat_job_queue';
        //Queue::push是将job执行任务的方法和传过来数据以及消息队列名字一起添加到消息队列,push是立即执行还有一个延迟执行later
        $isPushed = Queues::push($jobDoChat,$data,$jobQueueName);
        //然后判断是否加入消息队列成功
        if ($isPushed !== false) {
            // 成功之后的业务
            echo '队列加入成功';
        } else {
            // 失败之后的业务
            echo '队列加入失败';
        }

    }
}
 ?>

然后在使用的控制器里引用发布任务的这个文件

php
use app\kefu\Queue;
use app\kefu\Queue;

然后在方法中使用

php
// 接收用户消息
public function send(Request $request){
    $data = $request->param();
    //把需要执行数据传参到发布任务执行方法里
    Queue::chatHandle($msgData);
}
// 接收用户消息
public function send(Request $request){
    $data = $request->param();
    //把需要执行数据传参到发布任务执行方法里
    Queue::chatHandle($msgData);
}

这样,当有任务的时候,就会通过发布任务类,push到消息队列里,同时触发job下的执行任务类,对数据执行对应的方法

现在只是写完逻辑部分,还需要开通监听和执行的命令

命令行打开到目录下,运行监听

php
php think queue:listen
php think queue:listen

//发布任务类,有写当前消息队列的名字,也可以像下面一样直接执行 php think queue:work --queue chat_job_queue 有一个问题,本地测试的时候。中午吃完饭,回来监听就掉线了。没有常驻进程

官方给出的方案是:配合supervisor使用即可

图片

如果和我一样使用redis缓存消息队列数据,使用前redis一定要确保安装好。

本文到此结束。

反馈信息

INFO

邮箱: open_teams@163.com