Files
orico-official-website-old/app/command/NewKafkaConsumer.php
2024-10-29 14:04:59 +08:00

103 lines
3.9 KiB
PHP
Executable File

<?php
namespace app\command;
class NewKafkaConsumer extends Command
{
private $topic_name = 'orico-inquiry-data'; // kafka队列 topic
private $groupid = 'OrderStatus_log'; // 队列group
private $brokers = '127.0.0.1:9092'; // kafka broker
private $username = 'dolog'; // 队列账号
private $password = 'kafka_log_Aa-1221'; // 队列密码
private $retries = 3; // 程序异常重试次数
protected function configure()
{
// 指令配置
$this->setName('NewKafkaConsumer')
->setDescription('the NewKafkaConsumer command');
}
protected function execute(Input $input, Output $output)
{
$conf = new \RdKafka\Conf();
$conf->set('group.id', $this->groupid);
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('security.protocol', 'sasl_plaintext');
$conf->set('sasl.username', $this->username);
$conf->set('sasl.password', $this->password);
$conf->set('enable.auto.offset.store', 'false'); // 不自动提交
$conf->set('socket.keepalive.enable', 'true'); // socket不操持保持长连
$conf->set('log.connection.close', 'false'); // 静默所有自发断开连接日志
$conf->set('log_level',(string)LOG_DEBUG);
// $conf->set('debug','all');
$consumer = new \Rdkafka\Consumer($conf);
$consumer->addBrokers($this->brokers);
$topicConf = new \Rdkafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', '1000');
$topicConf->set('offset.store.method', 'broker');
$topicConf->set('auto.offset.reset', 'earliest');
$topic = $consumer->newTopic($this->topic_name, $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$msg = $topic->consume(0, 5000);
if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue;
} elseif ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
//日志写入
$payload_data = json_decode($msg->payload,true);
$this->doAddLog($payload_data);
// 记录要提交的消息offset
$topic->offsetStore($msg->partition, $msg->offset);
file_put_contents('./kafka_ceshi2.txt',$msg->payload.PHP_EOL,FILE_APPEND);
echo $msg->payload, "\n";
}
}
// $output->writeln($topic);
// return $topic;
// 指令输出
// $output->writeln('NewKafkaConsumer');
}
private function doAddLog($payload_data)
{
$payload_data = json_decode($payload_data,true);
if ( !empty($payload_data) ) {
foreach($payload_data as $kk=>$vv) {
if ( !empty($vv['logs_action']) ) {
Db::connect('rds_mysql')->name('logs_action')->insertAll($vv['logs_action']);
}
if ( !empty($vv['logs_api']) ) {
foreach($vv['logs_api'] as $kk2=>$vv2) {
$api_post['beginCode'] = $vv2['beginCode'];
$api_post['reurnCode'] = $vv2['reurnCode'];
unset($vv2['beginCode']);
unset($vv2['reurnCode']);
$api_post['id'] = Db::connect('rds_mysql')->name('logs_api')->insertGetId($vv2);
Db::connect('rds_mysql')->name('logs_api_dt')->insert($api_post);
}
}
if ( !empty($vv['logs_view']) ) {
Db::connect('rds_mysql')->name('logs_view')->insertAll($vv['logs_view']);
}
if ( !empty($vv['task_queue']) ) {
Db::connect('rds_mysql')->name('task_queue')->insertAll($vv['task_queue']);
}
}
}
}
}