init
This commit is contained in:
102
app/command/NewKafkaConsumer.php
Executable file
102
app/command/NewKafkaConsumer.php
Executable file
@@ -0,0 +1,102 @@
|
||||
<?php
|
||||
|
||||
namespace app\command;
|
||||
|
||||
|
||||
class NewKafkaConsumer extends Command
|
||||
{
|
||||
private $topic_name = 'orico-inquiry-data'; // kafka队列 topic
|
||||
private $groupid = 'OrderStatus_log'; // 队列group
|
||||
private $brokers = '127.0.0.1:9092'; // kafka broker
|
||||
private $username = 'dolog'; // 队列账号
|
||||
private $password = 'kafka_log_Aa-1221'; // 队列密码
|
||||
private $retries = 3; // 程序异常重试次数
|
||||
|
||||
protected function configure()
|
||||
{
|
||||
// 指令配置
|
||||
$this->setName('NewKafkaConsumer')
|
||||
->setDescription('the NewKafkaConsumer command');
|
||||
}
|
||||
|
||||
protected function execute(Input $input, Output $output)
|
||||
{
|
||||
$conf = new \RdKafka\Conf();
|
||||
$conf->set('group.id', $this->groupid);
|
||||
$conf->set('sasl.mechanisms', 'PLAIN');
|
||||
$conf->set('security.protocol', 'sasl_plaintext');
|
||||
$conf->set('sasl.username', $this->username);
|
||||
$conf->set('sasl.password', $this->password);
|
||||
$conf->set('enable.auto.offset.store', 'false'); // 不自动提交
|
||||
$conf->set('socket.keepalive.enable', 'true'); // socket不操持保持长连
|
||||
$conf->set('log.connection.close', 'false'); // 静默所有自发断开连接日志
|
||||
$conf->set('log_level',(string)LOG_DEBUG);
|
||||
// $conf->set('debug','all');
|
||||
$consumer = new \Rdkafka\Consumer($conf);
|
||||
$consumer->addBrokers($this->brokers);
|
||||
$topicConf = new \Rdkafka\TopicConf();
|
||||
$topicConf->set('auto.commit.interval.ms', '1000');
|
||||
$topicConf->set('offset.store.method', 'broker');
|
||||
$topicConf->set('auto.offset.reset', 'earliest');
|
||||
|
||||
$topic = $consumer->newTopic($this->topic_name, $topicConf);
|
||||
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
|
||||
while (true) {
|
||||
$msg = $topic->consume(0, 5000);
|
||||
if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
|
||||
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
|
||||
continue;
|
||||
} elseif ($msg->err) {
|
||||
echo $msg->errstr(), "\n";
|
||||
break;
|
||||
} else {
|
||||
//日志写入
|
||||
$payload_data = json_decode($msg->payload,true);
|
||||
$this->doAddLog($payload_data);
|
||||
// 记录要提交的消息offset
|
||||
$topic->offsetStore($msg->partition, $msg->offset);
|
||||
file_put_contents('./kafka_ceshi2.txt',$msg->payload.PHP_EOL,FILE_APPEND);
|
||||
echo $msg->payload, "\n";
|
||||
}
|
||||
}
|
||||
|
||||
// $output->writeln($topic);
|
||||
// return $topic;
|
||||
// 指令输出
|
||||
// $output->writeln('NewKafkaConsumer');
|
||||
}
|
||||
|
||||
private function doAddLog($payload_data)
|
||||
{
|
||||
$payload_data = json_decode($payload_data,true);
|
||||
if ( !empty($payload_data) ) {
|
||||
foreach($payload_data as $kk=>$vv) {
|
||||
if ( !empty($vv['logs_action']) ) {
|
||||
Db::connect('rds_mysql')->name('logs_action')->insertAll($vv['logs_action']);
|
||||
}
|
||||
if ( !empty($vv['logs_api']) ) {
|
||||
foreach($vv['logs_api'] as $kk2=>$vv2) {
|
||||
$api_post['beginCode'] = $vv2['beginCode'];
|
||||
$api_post['reurnCode'] = $vv2['reurnCode'];
|
||||
unset($vv2['beginCode']);
|
||||
unset($vv2['reurnCode']);
|
||||
$api_post['id'] = Db::connect('rds_mysql')->name('logs_api')->insertGetId($vv2);
|
||||
Db::connect('rds_mysql')->name('logs_api_dt')->insert($api_post);
|
||||
}
|
||||
}
|
||||
if ( !empty($vv['logs_view']) ) {
|
||||
Db::connect('rds_mysql')->name('logs_view')->insertAll($vv['logs_view']);
|
||||
}
|
||||
if ( !empty($vv['task_queue']) ) {
|
||||
Db::connect('rds_mysql')->name('task_queue')->insertAll($vv['task_queue']);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user