post(); $sign_data['action_logs'] = $data['action_logs']; $sign_data['api_logs'] = $data['api_logs']; $sign_data['lg_logs_view'] = $data['lg_logs_view']; $sign_data['lg_task_queue'] = $data['lg_task_queue']; $sign = $this->getSign($sign_data); if ( empty($data['sign']) ) { return error('签名为空'); } if ( $sign !== $data['sign'] ) { return error('签名错误'); } if ( empty($data['action_logs']) && empty($data['api_logs']) && empty($data['lg_logs_view']) && empty($data['lg_task_queue']) ) { return error('日志内容为空'); } if ( !empty($data['action_logs']) ) { $action_logs = unserialize($data['action_logs']); } if ( !empty($data['api_logs']) ) { $api_logs = unserialize($data['api_logs']); } if ( !empty($data['lg_logs_view']) ) { $lg_logs_view = unserialize($data['lg_logs_view']); } if ( !empty($data['lg_task_queue']) ) { $lg_task_queue = unserialize($data['lg_task_queue']); } $do_add_logs = [['logs_action'=>$action_logs],['logs_api'=>$api_logs],['logs_view'=>$lg_logs_view],['task_queue'=>$lg_task_queue]]; $message = json_encode($do_add_logs); $topic = $this->topic_name; try{ $this->producer($message, $topic); return success('日志写入成功'); }catch(Exception $e){ return error('日志写入失败'); } } /* * 签名 * */ public function getSign($param) { ksort($param); $signStr = "logsmonitor"; $secretKey = 'logsmonitorscretlist'; foreach ( $param as $key => $value ) { $signStr = $signStr . $key . "=" . $value . "&"; } $signStr = substr($signStr, 0, -1); $signature = base64_encode(hash_hmac("sha1", $signStr, $secretKey, true)); return $signature; } public function producer($message, $topic = 'log-const-total-data') { $conf = new \RdKafka\Conf(); $conf->set('sasl.mechanisms', 'PLAIN'); $conf->set('security.protocol', 'sasl_plaintext'); $conf->set('sasl.username', $this->username); $conf->set('sasl.password', $this->password); $conf->set('socket.keepalive.enable', 'true'); // socket不操持保持长连 $conf->set('log.connection.close', 'false'); // 静默所有自发断开连接日志 $conf->set('metadata.broker.list', $this->brokerList); $producer = new \RdKafka\Producer($conf); $topic = $producer->newTopic($topic); $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message)); $producer->poll(0); $result = $producer->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new Exception('Was unable to flush, messages might be lost!'); } } }