
RabbitMQ实战-日志采集器-Protobuf压缩
一:概要
《RabbitMQ实战指南》中第四章「解决Rabbit相关问题」中有关于日志采集器文中是使用python实现,本文使用php的实现以及消息采用protobuf压缩。中间使用到的是Topic交换机。

二:环境准备
- RabbitMQ
- PHP-Protobuf
- Protoc
三:Protoc编译消息类
假设我们中间传输的消息类为LogProto,编写以下logMessage.proto文件
// logMessage.proto syntax = "proto3"; message LogProto { string time = 1; //日志生成时间 string trace = 2; //日志附加内容 string level = 3; //日志级别 }
生成PHP类
$ php ~/php-protobuf-master/protobuf-gen-php.php logMessage.proto
现在我们目录有两个文件
- logMessage.proto (完成使命了)
- LogProto.php(需要引入的消息类)
四:编写配置文件 (为了偷懒)
<?php // baseConfig.php date_default_timezone_set('Asia/Shanghai'); // AMQP连接配置 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login'=>'user', 'password'=>'user', 'vhost'=>'/' ); // 消息反序列化 function protobuf_decode($packed, $object){ $obj = null; try{ $obj = new $object(); $obj->parseFromString($packed); }catch(Exception $e){ die($e->getMessage()); } return $obj; } // 消息序列化 function protobuf_encode($object){ return $object->serializeToString(); }
五:编写日志消息消费者
其中消费者最主要的就是
- 定义什么类型的交换机
- 定义好队列
- 将交换机与队列通过路由键绑定
- 将队列的消息进行消费,并对处理好的消息进行ack应答
<?php // LogConsumer.php require './baseConfig.php'; require './LogProto.php'; // 交换机名称 $exchange_name = 'log_protobuf'; // 队列名称 $queue_name = 'log_protobuf_topic'; // 路由键 $routing_key = '*.log'; // 连接AMQP $amqp_conn = new AMQPConnection($conn_args); $amqp_conn->connect() ? :die('cannot connect broker'); // 获取信道 $channel = new AMQPChannel($amqp_conn); // 定义Topic交换机 $exchange = new AMQPExchange($channel); $exchange->setType(AMQP_EX_TYPE_TOPIC); $exchange->setName($exchange_name); $exchange->declareExchange(); // 定义队列 $queue = new AMQPQueue($channel); $queue->setName($queue_name); $queue->declareQueue(); // 绑定 $queue->bind($exchange_name, $routing_key); while(true){ $queue->consume('consumer'); } $amqp_conn->disconnect(); // 消费Handler function consumer($envelop, $queue){ // 将传输的消息进行反序列成我们想要的LogProto类 $logProto = protobuf_decode($envelop->getBody(), LogProto::class); echo "receive>> {$logProto->getTime()}({$logProto->getLevel()}): {$logProto->getTrace()}".PHP_EOL; // ack应答 $queue->ack($envelop->getDeliveryTag()); }
六:编写warn日志生产者
其中生产者主要就是生产我们所需的消息,然后发布到对应的交换机即可。路由键「warn.log」可以被Topic模式的交换机路由键「*.log」匹配,如果前面有多个参数,日志消费者的路由键可以使用哈希模式「#.log」
<?php // WarnProducer.php require './baseConfig.php'; require './LogProto.php'; $exchange_name = 'log_protobuf'; $routing_key = 'warn.log'; $amqp_conn = new AMQPConnection($conn_args); $amqp_conn->connect() ? :die('cannot connect broker'); $channel = new AMQPChannel($amqp_conn); $exchange = new AMQPExchange($channel); $exchange->setName($exchange_name); // 新增LogProto对象,传入我们要设定的日志消息 $logProto = new LogProto(); $logProto->setLevel('warn'); $logProto->setTime(Date('Y-m-d H:i:s', time())); $logProto->setTrace('you\'re modify a read-only file'); // 序列化对象 $packed = protobuf_encode($logProto); for ($i=0; $i < 10; $i++) { // 对交换机发布我们的消息 $exchange->publish($packed, $routing_key); sleep(random_int(1,3)); } $amqp_conn->disconnect();
七:编写error日志生产者
跟warn日志生产者的大致一样,将路由键改成error.log以及重新定义LogProto的消息体内容即可。
<?php // ErrorProducer.php require './baseConfig.php'; require './LogProto.php'; $exchange_name = 'log_protobuf'; $routing_key = 'error.log'; $amqp_conn = new AMQPConnection($conn_args); $amqp_conn->connect() ? :die('cannot connect broker'); $channel = new AMQPChannel($amqp_conn); $exchange = new AMQPExchange($channel); $exchange->setName($exchange_name); $logProto = new LogProto(); $logProto->setLevel('error'); $logProto->setTime(Date('Y-m-d H:i:s', time())); $logProto->setTrace('Call to undefined method RabbitMq::connect()'); $packed = protobuf_encode($logProto); for ($i=0; $i < 10; $i++) { $exchange->publish($packed, $routing_key); sleep(random_int(1,3)); } $amqp_conn->disconnect();
八:运行结果
现在我们总共有六个文件
- logMessage.proto (完成使命了)
- LogProto.php(需要引入的消息类)
- baseConfig.php(配置文件,需要引入)
- LogConsumer.php(日志消费者)
- WarnProducer.php(警告日志生产者)
- ErrorProducer.php(错误日志生产者)
分别开三个终端测试
# terminal-one $ php LogConsumer.php # terminal-two $ php WarnProducer.php # terminal-three $ php ErrorProducer.php
其中日志消费者结果如下,输出符合预期。

RabbitMQ-management中显示刚才的处理的消息
