Files
orico-official-website-old/app/api/controller/KafkaProducer.php
2024-10-29 14:04:59 +08:00

95 lines
3.3 KiB
PHP
Executable File

<?php
//declare (strict_types = 1);
namespace app\api\controller;
class KafkaProducer
{
private $topic_name = 'log-const-total-data'; // kafka队列 topic
private $username = 'dolog'; // 队列账号
private $password = 'kafka_log_Aa-1221'; // 队列密码
private $brokerList = '127.0.0.1:9092';
public function send()
{
$data = request()->post();
$sign_data['action_logs'] = $data['action_logs'];
$sign_data['api_logs'] = $data['api_logs'];
$sign_data['lg_logs_view'] = $data['lg_logs_view'];
$sign_data['lg_task_queue'] = $data['lg_task_queue'];
$sign = $this->getSign($sign_data);
if ( empty($data['sign']) ) {
return error('签名为空');
}
if ( $sign !== $data['sign'] ) {
return error('签名错误');
}
if ( empty($data['action_logs']) && empty($data['api_logs']) && empty($data['lg_logs_view']) && empty($data['lg_task_queue']) ) {
return error('日志内容为空');
}
if ( !empty($data['action_logs']) ) {
$action_logs = unserialize($data['action_logs']);
}
if ( !empty($data['api_logs']) ) {
$api_logs = unserialize($data['api_logs']);
}
if ( !empty($data['lg_logs_view']) ) {
$lg_logs_view = unserialize($data['lg_logs_view']);
}
if ( !empty($data['lg_task_queue']) ) {
$lg_task_queue = unserialize($data['lg_task_queue']);
}
$do_add_logs = [['logs_action'=>$action_logs],['logs_api'=>$api_logs],['logs_view'=>$lg_logs_view],['task_queue'=>$lg_task_queue]];
$message = json_encode($do_add_logs);
$topic = $this->topic_name;
try{
$this->producer($message, $topic);
return success('日志写入成功');
}catch(Exception $e){
return error('日志写入失败');
}
}
/*
* 签名
* */
public function getSign($param)
{
ksort($param);
$signStr = "logsmonitor";
$secretKey = 'logsmonitorscretlist';
foreach ( $param as $key => $value ) {
$signStr = $signStr . $key . "=" . $value . "&";
}
$signStr = substr($signStr, 0, -1);
$signature = base64_encode(hash_hmac("sha1", $signStr, $secretKey, true));
return $signature;
}
public function producer($message, $topic = 'log-const-total-data')
{
$conf = new \RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('security.protocol', 'sasl_plaintext');
$conf->set('sasl.username', $this->username);
$conf->set('sasl.password', $this->password);
$conf->set('socket.keepalive.enable', 'true'); // socket不操持保持长连
$conf->set('log.connection.close', 'false'); // 静默所有自发断开连接日志
$conf->set('metadata.broker.list', $this->brokerList);
$producer = new \RdKafka\Producer($conf);
$topic = $producer->newTopic($topic);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));
$producer->poll(0);
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new Exception('Was unable to flush, messages might be lost!');
}
}
}