首页 > 文章详情 > Beanstalkd 消息队列编译安装及使用

Beanstalkd 消息队列编译安装及使用

原创 YuanDong 2019-10-08 浏览量(53)

Beanstalkd 介绍

Beanstalkd 是一个轻量级的内存型队列,利用了和 Memcache 类似的协议。依赖 libevent 单线程事件分发机制, 可以部署多个实例,但是高并发支持还是不太友好;

编译安装 Beanstalkd

  • 下载源码包至 /usr/local/src/ (建议去官网下载)
官网下载   :https://beanstalkd.github.io/download.html
wget下载 :wget https://github.com/beanstalkd/beanstalkd/archive/v1.11.tar.gz
  • 解压源码包
tar -zxvf v1.11.tar.gz
  • 切换到源码包路径
cd beanstalkd-1.11
  • 编译安装
make install  (默认安装路径  /usr/local/bin/beanstalkd )
  • 创建beanstalkd用户和binlog目录
groupadd beanstalkd
useradd -s /sbin/nologin -g beanstalkd beanstalkd
mkdir -p /var/lib/beanstalkd/binlog
chown -R beanstalkd:beanstalkd /var/lib/beanstalkd/binlog

Beanstalkd 基础命令

/usr/local/bin/beanstalkd -h
Use: /usr/local/bin/beanstalkd [OPTIONS]

Options:
-b  开启binlog,断电后重启会自动恢复任务。
-f  MS fsync最多每MS毫秒
-F  从不fsync(默认)
-l  ADDR侦听地址(默认为0.0.0.0)
-p  端口侦听端口(默认为11300)
-u  USER成为用户和组
-z  BYTES设置最大作业大小(以字节为单位)(默认值为65535)
-s  BYTES设置每个wal文件的大小(默认为10485760) (将被舍入到512字节的倍数)
-c  压缩binlog(默认)
-n  不要压缩binlog
-v  显示版本信息
-V  增加冗长度
-h  显示这个帮助

常见启动如下:
beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd/binlog &

PHP中使用介绍

安装Pheanstalk类

首先安装composer
curl -sS https://getcomposer.org/installer | php
mv composer.phar /usr/local/bin/composer

安装PHP依赖包
composer require pda/pheanstalk ~4.0

使用方法

<?php
require './vendor/autoload.php';
use Pheanstalk\Pheanstalk;

//连接队列
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
**** 维护方法 ****
//查看状态方法;
$pheanstalk->stats();
//目前存在的管道
$pheanstalk->listTubes();
//目前监听的管道
$pheanstalk->listTubesWatched();
//管道的状态
$pheanstalk->statsTube($tubeName);
//指定使用的管道
$pheanstalk->useTube($tubeName);
//查看任务的详细信息
$pheanstalk->statsJob($job);
//通过任务ID获取任务
$pheanstalk->peek($jobID);
**** 生产者方法 ****
//往管道中写入数据
//put 任务 方式一; 返回新 job 的任务标识,整型值;
$pheanstalk->useTube('test')->put(
    '任务内容', 
     23,    //任务的优先级, 默认为 1024
     0,     //不等待直接放到ready队列中.
     60     //处理任务的时间(单位为秒)
);

//put 任务 方式二; 返回新 job 的任务标识,整型值;
$pheanstalk->putInTube(
    'test', // 管道名称
    '任务内容', 
     23, // 任务的优先级, 默认为 1024
     0, // 不等待直接放到ready队列中. 如值为 60 表示 60秒;
     60 // 处理任务的时间(单位为秒)
);

// 给管道里所有新任务设置延迟
$pheanstalk->pauseTube('test', 30);

// 取消管道延迟
$pheanstalk->resumeTube('test');
**** 消费者方法 ****

//监听管道,可以同时监听多个管道
$pheanstalk->watch();

//不监听管道
$pheanstalk->ignore();

//以阻塞方式监听管道,获取任务
$pheanstalk->reserve();
$pheanstalk->reserveFromTube();

//把任务重新放回管道
$pheanstalk->release();

//把任务预留
$pheanstalk->bury() ;

//把预留任务读取出来
$pheanstalk->peekBuried() ;

//把buried状态的任务设置成ready
$pheanstalk->kickJob();

//批量把buried状态的任务设置成ready
$pheanstalk->kick();

//把准备好的任务读取出来
$pheanstalk->peekReady();

//把延迟的任务读取出来
$pheanstalk->peekDelayed();

//给管道设置延迟
$pheanstalk->pauseTube();

//取消管道延迟
$pheanstalk->resumeTube();

//让任务重新计算ttr时间,给任务续命
$pheanstalk->touch();
--- 生产者使用案例 ---
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$data = [
    'order_id'=>1,
    'order_no'=>'20190908111231231'
];

//$jobID  添加成功后返回任务id
$jobID = $pheanstalk->useTube('test')->put(
    json_encode($data),
    23,    //任务的优先级, 默认为 1024
    0,     //不等待直接放到ready队列中.
    60     //处理任务的时间(单位为秒)
);

//通过任务ID获取任务
$job = $pheanstalk->peek($jobID);

//查看任务的详细信息
$jobInfo = $pheanstalk->statsJob($job);
--- 消费者使用案例 --- 
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$useTube = 'test';
while (true){
    $job = $pheanstalk->watch($useTube)->ignore('default')->reserve();
    if ($data = $job->getData()) {
       $data = json_decode($data,true);
       if(!$group->groupOvertime($data)){
             $pheanstalk->release($job);
       }else{
             $pheanstalk->delete($job);
       }
    }
}

热门评论 (0)

网友评论 0 条评论 / 0 人参与