setName('NewKafkaConsumer') ->setDescription('the NewKafkaConsumer command'); } protected function execute(Input $input, Output $output) { $conf = new \RdKafka\Conf(); $conf->set('group.id', $this->groupid); $conf->set('sasl.mechanisms', 'PLAIN'); $conf->set('security.protocol', 'sasl_plaintext'); $conf->set('sasl.username', $this->username); $conf->set('sasl.password', $this->password); $conf->set('enable.auto.offset.store', 'false'); // 不自动提交 $conf->set('socket.keepalive.enable', 'true'); // socket不操持保持长连 $conf->set('log.connection.close', 'false'); // 静默所有自发断开连接日志 $conf->set('log_level',(string)LOG_DEBUG); // $conf->set('debug','all'); $consumer = new \Rdkafka\Consumer($conf); $consumer->addBrokers($this->brokers); $topicConf = new \Rdkafka\TopicConf(); $topicConf->set('auto.commit.interval.ms', '1000'); $topicConf->set('offset.store.method', 'broker'); $topicConf->set('auto.offset.reset', 'earliest'); $topic = $consumer->newTopic($this->topic_name, $topicConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $msg = $topic->consume(0, 5000); if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) { // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead. continue; } elseif ($msg->err) { echo $msg->errstr(), "\n"; break; } else { //日志写入 $payload_data = json_decode($msg->payload,true); $this->doAddLog($payload_data); // 记录要提交的消息offset $topic->offsetStore($msg->partition, $msg->offset); file_put_contents('./kafka_ceshi2.txt',$msg->payload.PHP_EOL,FILE_APPEND); echo $msg->payload, "\n"; } } // $output->writeln($topic); // return $topic; // 指令输出 // $output->writeln('NewKafkaConsumer'); } private function doAddLog($payload_data) { $payload_data = json_decode($payload_data,true); if ( !empty($payload_data) ) { foreach($payload_data as $kk=>$vv) { if ( !empty($vv['logs_action']) ) { Db::connect('rds_mysql')->name('logs_action')->insertAll($vv['logs_action']); } if ( !empty($vv['logs_api']) ) { foreach($vv['logs_api'] as $kk2=>$vv2) { $api_post['beginCode'] = $vv2['beginCode']; $api_post['reurnCode'] = $vv2['reurnCode']; unset($vv2['beginCode']); unset($vv2['reurnCode']); $api_post['id'] = Db::connect('rds_mysql')->name('logs_api')->insertGetId($vv2); Db::connect('rds_mysql')->name('logs_api_dt')->insert($api_post); } } if ( !empty($vv['logs_view']) ) { Db::connect('rds_mysql')->name('logs_view')->insertAll($vv['logs_view']); } if ( !empty($vv['task_queue']) ) { Db::connect('rds_mysql')->name('task_queue')->insertAll($vv['task_queue']); } } } } }