一:概要

《RabbitMQ实战指南》中第四章「解决Rabbit相关问题」中有关于日志采集器文中是使用python实现,本文使用php的实现以及消息采用protobuf压缩。中间使用到的是Topic交换机。

二:环境准备

  • RabbitMQ
  • PHP-Protobuf
  • Protoc

三:Protoc编译消息类

假设我们中间传输的消息类为LogProto,编写以下logMessage.proto文件

// logMessage.proto

syntax = "proto3";
message LogProto {
    string time = 1;  //日志生成时间
    string trace = 2; //日志附加内容
    string level = 3; //日志级别
}

生成PHP类

$ php ~/php-protobuf-master/protobuf-gen-php.php logMessage.proto

现在我们目录有两个文件

  • logMessage.proto (完成使命了)
  • LogProto.php(需要引入的消息类)

四:编写配置文件 (为了偷懒)

<?php
// baseConfig.php

date_default_timezone_set('Asia/Shanghai');
// AMQP连接配置
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login'=>'user',
    'password'=>'user',
    'vhost'=>'/'
);
// 消息反序列化
function protobuf_decode($packed, $object){
    $obj = null;
    try{
        $obj = new $object();
        $obj->parseFromString($packed);
    }catch(Exception $e){
        die($e->getMessage());
    }
    return $obj;
}
// 消息序列化
function protobuf_encode($object){
    return $object->serializeToString();
}

五:编写日志消息消费者

其中消费者最主要的就是

  • 定义什么类型的交换机
  • 定义好队列
  • 将交换机与队列通过路由键绑定
  • 将队列的消息进行消费,并对处理好的消息进行ack应答
<?php
// LogConsumer.php

require './baseConfig.php';
require './LogProto.php';
// 交换机名称
$exchange_name = 'log_protobuf';
// 队列名称
$queue_name = 'log_protobuf_topic';
// 路由键
$routing_key = '*.log';

// 连接AMQP
$amqp_conn = new AMQPConnection($conn_args);
$amqp_conn->connect() ? :die('cannot connect broker');

// 获取信道
$channel = new AMQPChannel($amqp_conn);

// 定义Topic交换机
$exchange = new AMQPExchange($channel);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->setName($exchange_name);
$exchange->declareExchange();

// 定义队列
$queue = new AMQPQueue($channel);
$queue->setName($queue_name);
$queue->declareQueue();

// 绑定
$queue->bind($exchange_name, $routing_key);

while(true){
    $queue->consume('consumer');
}
$amqp_conn->disconnect();

// 消费Handler
function consumer($envelop, $queue){
    // 将传输的消息进行反序列成我们想要的LogProto类
    $logProto = protobuf_decode($envelop->getBody(), LogProto::class);
    echo "receive>> {$logProto->getTime()}({$logProto->getLevel()}): {$logProto->getTrace()}".PHP_EOL;
    // ack应答
    $queue->ack($envelop->getDeliveryTag());
}

六:编写warn日志生产者

其中生产者主要就是生产我们所需的消息,然后发布到对应的交换机即可。路由键「warn.log」可以被Topic模式的交换机路由键「*.log」匹配,如果前面有多个参数,日志消费者的路由键可以使用哈希模式「#.log」

<?php
// WarnProducer.php

require './baseConfig.php';
require './LogProto.php';

$exchange_name = 'log_protobuf';
$routing_key = 'warn.log';

$amqp_conn = new AMQPConnection($conn_args);

$amqp_conn->connect() ? :die('cannot connect broker');

$channel = new AMQPChannel($amqp_conn);

$exchange = new AMQPExchange($channel);
$exchange->setName($exchange_name);

// 新增LogProto对象,传入我们要设定的日志消息
$logProto = new LogProto();
$logProto->setLevel('warn');
$logProto->setTime(Date('Y-m-d H:i:s', time()));
$logProto->setTrace('you\'re modify a read-only file');
// 序列化对象
$packed = protobuf_encode($logProto);

for ($i=0; $i < 10; $i++) {
    // 对交换机发布我们的消息
    $exchange->publish($packed, $routing_key);
    sleep(random_int(1,3));
}
$amqp_conn->disconnect();

七:编写error日志生产者

跟warn日志生产者的大致一样,将路由键改成error.log以及重新定义LogProto的消息体内容即可。

<?php
// ErrorProducer.php

require './baseConfig.php';
require './LogProto.php';

$exchange_name = 'log_protobuf';
$routing_key = 'error.log';

$amqp_conn = new AMQPConnection($conn_args);

$amqp_conn->connect() ? :die('cannot connect broker');

$channel = new AMQPChannel($amqp_conn);

$exchange = new AMQPExchange($channel);
$exchange->setName($exchange_name);

$logProto = new LogProto();
$logProto->setLevel('error');
$logProto->setTime(Date('Y-m-d H:i:s', time()));
$logProto->setTrace('Call to undefined method RabbitMq::connect()');
$packed = protobuf_encode($logProto);
for ($i=0; $i < 10; $i++) {
    $exchange->publish($packed, $routing_key);
    sleep(random_int(1,3));
}
$amqp_conn->disconnect();

八:运行结果

现在我们总共有六个文件

  • logMessage.proto (完成使命了)
  • LogProto.php(需要引入的消息类)
  • baseConfig.php(配置文件,需要引入)
  • LogConsumer.php(日志消费者)
  • WarnProducer.php(警告日志生产者)
  • ErrorProducer.php(错误日志生产者)

分别开三个终端测试

# terminal-one
$ php LogConsumer.php

# terminal-two
$ php WarnProducer.php

# terminal-three
$ php ErrorProducer.php

其中日志消费者结果如下,输出符合预期。

Terminal-one的显示

RabbitMQ-management中显示刚才的处理的消息

RabbitMQ-managment

您或许感兴趣

发表评论

电子邮件地址不会被公开。