103 lines
3.9 KiB
PHP
Executable File
103 lines
3.9 KiB
PHP
Executable File
<?php
|
|
|
|
namespace app\command;
|
|
|
|
|
|
class NewKafkaConsumer extends Command
|
|
{
|
|
private $topic_name = 'orico-inquiry-data'; // kafka队列 topic
|
|
private $groupid = 'OrderStatus_log'; // 队列group
|
|
private $brokers = '127.0.0.1:9092'; // kafka broker
|
|
private $username = 'dolog'; // 队列账号
|
|
private $password = 'kafka_log_Aa-1221'; // 队列密码
|
|
private $retries = 3; // 程序异常重试次数
|
|
|
|
protected function configure()
|
|
{
|
|
// 指令配置
|
|
$this->setName('NewKafkaConsumer')
|
|
->setDescription('the NewKafkaConsumer command');
|
|
}
|
|
|
|
protected function execute(Input $input, Output $output)
|
|
{
|
|
$conf = new \RdKafka\Conf();
|
|
$conf->set('group.id', $this->groupid);
|
|
$conf->set('sasl.mechanisms', 'PLAIN');
|
|
$conf->set('security.protocol', 'sasl_plaintext');
|
|
$conf->set('sasl.username', $this->username);
|
|
$conf->set('sasl.password', $this->password);
|
|
$conf->set('enable.auto.offset.store', 'false'); // 不自动提交
|
|
$conf->set('socket.keepalive.enable', 'true'); // socket不操持保持长连
|
|
$conf->set('log.connection.close', 'false'); // 静默所有自发断开连接日志
|
|
$conf->set('log_level',(string)LOG_DEBUG);
|
|
// $conf->set('debug','all');
|
|
$consumer = new \Rdkafka\Consumer($conf);
|
|
$consumer->addBrokers($this->brokers);
|
|
$topicConf = new \Rdkafka\TopicConf();
|
|
$topicConf->set('auto.commit.interval.ms', '1000');
|
|
$topicConf->set('offset.store.method', 'broker');
|
|
$topicConf->set('auto.offset.reset', 'earliest');
|
|
|
|
$topic = $consumer->newTopic($this->topic_name, $topicConf);
|
|
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
|
|
while (true) {
|
|
$msg = $topic->consume(0, 5000);
|
|
if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
|
|
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
|
|
continue;
|
|
} elseif ($msg->err) {
|
|
echo $msg->errstr(), "\n";
|
|
break;
|
|
} else {
|
|
//日志写入
|
|
$payload_data = json_decode($msg->payload,true);
|
|
$this->doAddLog($payload_data);
|
|
// 记录要提交的消息offset
|
|
$topic->offsetStore($msg->partition, $msg->offset);
|
|
file_put_contents('./kafka_ceshi2.txt',$msg->payload.PHP_EOL,FILE_APPEND);
|
|
echo $msg->payload, "\n";
|
|
}
|
|
}
|
|
|
|
// $output->writeln($topic);
|
|
// return $topic;
|
|
// 指令输出
|
|
// $output->writeln('NewKafkaConsumer');
|
|
}
|
|
|
|
private function doAddLog($payload_data)
|
|
{
|
|
$payload_data = json_decode($payload_data,true);
|
|
if ( !empty($payload_data) ) {
|
|
foreach($payload_data as $kk=>$vv) {
|
|
if ( !empty($vv['logs_action']) ) {
|
|
Db::connect('rds_mysql')->name('logs_action')->insertAll($vv['logs_action']);
|
|
}
|
|
if ( !empty($vv['logs_api']) ) {
|
|
foreach($vv['logs_api'] as $kk2=>$vv2) {
|
|
$api_post['beginCode'] = $vv2['beginCode'];
|
|
$api_post['reurnCode'] = $vv2['reurnCode'];
|
|
unset($vv2['beginCode']);
|
|
unset($vv2['reurnCode']);
|
|
$api_post['id'] = Db::connect('rds_mysql')->name('logs_api')->insertGetId($vv2);
|
|
Db::connect('rds_mysql')->name('logs_api_dt')->insert($api_post);
|
|
}
|
|
}
|
|
if ( !empty($vv['logs_view']) ) {
|
|
Db::connect('rds_mysql')->name('logs_view')->insertAll($vv['logs_view']);
|
|
}
|
|
if ( !empty($vv['task_queue']) ) {
|
|
Db::connect('rds_mysql')->name('task_queue')->insertAll($vv['task_queue']);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|