addBrokers("127.0.0.1:9092,127.0.0.2:9092"); private $username = 'OricoInquiry'; // 队列账号 private $password = 'kafka_orico_inquiry-f2b211-1221'; // 队列密码 private $retries = 3; // 程序异常重试次数 protected function configure() { // 指令配置 $this->setName('NewKafkaConsumer') ->setDescription('the NewKafkaConsumer command'); } private function init_search(&$search){ $search['name'] = ''; $search['timebegin'] = ''; $search['timeend'] = ''; } //Kafka 消息日志 public function send() { $data = request()->post(); $sign_data['action_logs'] = $data['action_logs']; $sign_data['api_logs'] = $data['api_logs']; $sign_data['lg_logs_view'] = $data['lg_logs_view']; $sign_data['lg_task_queue'] = $data['lg_task_queue']; $sign = $this->getSign($sign_data); if ( empty($data['sign']) ) { return error('签名为空'); } if ( $sign !== $data['sign'] ) { return error('签名错误'); } if ( empty($data['action_logs']) && empty($data['api_logs']) && empty($data['lg_logs_view']) && empty($data['lg_task_queue']) ) { return error('日志内容为空'); } if ( !empty($data['action_logs']) ) { $action_logs = unserialize($data['action_logs']); } if ( !empty($data['api_logs']) ) { $api_logs = unserialize($data['api_logs']); } if ( !empty($data['lg_logs_view']) ) { $lg_logs_view = unserialize($data['lg_logs_view']); } if ( !empty($data['lg_task_queue']) ) { $lg_task_queue = unserialize($data['lg_task_queue']); } $do_add_logs = [['logs_action'=>$action_logs],['logs_api'=>$api_logs],['logs_view'=>$lg_logs_view],['task_queue'=>$lg_task_queue]]; $message = json_encode($do_add_logs); $topic = $this->topic_name; try{ $this->producer($message, $topic); return success('日志写入成功'); }catch(Exception $e){ return error('日志写入失败'); } } /* * 签名 * */ public function getSign($param) { ksort($param); $signStr = "logsmonitor"; $secretKey = 'logsmonitorscretlist'; foreach ( $param as $key => $value ) { $signStr = $signStr . $key . "=" . $value . "&"; } $signStr = substr($signStr, 0, -1); $signature = base64_encode(hash_hmac("sha1", $signStr, $secretKey, true)); return $signature; } //Kafka 创建生产者 public function producer($message, $topic = 'orico-inquiry-data') { $conf = new \RdKafka\Conf(); $conf->set('sasl.mechanisms', 'PLAIN'); $conf->set('security.protocol', 'sasl_plaintext'); $conf->set('sasl.username', $this->username); $conf->set('sasl.password', $this->password); $conf->set('socket.keepalive.enable', 'true'); // socket不操持保持长连 $conf->set('log.connection.close', 'false'); // 静默所有自发断开连接日志 $conf->set('metadata.broker.list', $this->brokers); $producer = new \RdKafka\Producer($conf); $topic = $producer->newTopic($topic); $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message)); $producer->poll(0); // sleep(2); $result = $producer->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new Exception('Was unable to flush, messages might be lost!'); } return 1; } //Become a Distributor/Reseller 成为代理/经销商 public function Distributor(){ $agent = Db('agents'); $data = $this->request->param(); $arg_where = ['stat' => ['in','0,1'], 'country_code' => $this->country_code]; $search = []; $this->init_search($search); if (isset($data['name']) && $data['name'] != ''){ $arg_where['name|interest|phone'] = ['like', "%$data[name]%"]; $search['name'] = $data['name']; } if ((isset($data['timebegin']) && $data['timebegin'] != '') || (isset($data['timeend']) && $data['timeend'] != '')){ // 时间有一个不为空就初始化 $arg_where['createtime'] = []; if (isset($data['timebegin']) && $data['timebegin'] != '') { $time = $data['timebegin']; array_push($arg_where['createtime'], ['>=', $time]); $search['timebegin'] = $data['timebegin']; } else{ array_push($arg_where['createtime'], ['>=', "0000-00-00"]); } if (isset($data['timeend']) && $data['timeend'] != '') { $time = $data['timeend']; array_push($arg_where['createtime'], ['<=', $time]); $search['timeend'] = $data['timeend']; } else{ $time = date('Y-m-d H:i:s',strtotime('+1 month')); array_push($arg_where['createtime'], ['<=', $time]); } } // $list = $agent->where($arg_where)->select(); //$count = count($list); $list = $agent->where($arg_where)->order('id DESC')->select(); //$page = $list->render(); $dataFeed = array(); // dump($list);exit; try { foreach($list as $key =>$item) { $feedBack = array(); $feedBack['name'] = $item['name']." ".$item['last_name']; $feedBack['phone'] = $item['phone']; $feedBack['email'] = $item['email']; $feedBack['website'] = $item['uri']; $feedBack['company'] = $item['company']; $feedBack['country_timezone_id'] = $item['state']; $feedBack['create_time'] = $item['createtime']; $feedBack['industry'] = ''; $feedBack['bbs_account'] = ''; $feedBack['inquiry_subject'] = ''; $feedBack['inquiry_content'] = ''; $feedBack['interest_products'] = $item['interest']; $feedBack['inquiry_source_page'] = $item['url']; $feedBack['agent_country'] = $item['country']; $feedBack['feedback_refer'] = $item['refer']; $feedBack['feedback_ip'] = $item['ip']; $feedBack['feedback_country'] = $item['state']; $feedBack['feedback_province'] = $item['province']; $feedBack['feedback_city'] = $item['city']; $feedBack['inquiry_device'] = $item['drvice']; $feedBack['feedback_type'] = 1; $dataFeed['lg_task_queue'][] = array( 'database'=>'orico_cms',//数据库 'module'=>'customer',//模块 'worked'=>'add',//事务 'data_feed'=>json_encode($feedBack),//数据 'create_time'=>date("Y-m-d H:i:s"),//执行开始时间 'end_time'=>'',//执行结束时间 'elapsed_time'=>'',//耗时 'stat'=>2,//状态 'create_uid' =>'1',//数据提交人ID //'sql'=>'',//消息为执行SQL语句 'api_url'=>'https://dev.crm.api.f2b211.com/admin/v2_0/inquiry',//消费请求调用接口 'token'=>'efbdb6337a837ecfd36240276b74108c',//消费请求调用接口令牌 'method'=>'POST',//消费请求试GET、POST 'post_data'=>json_encode($feedBack),//消费请求传参 ); } $dataFeed['database'] = 'orico_cms'; $dataFeed['dotype'] = 'queue'; print_r($this->producer($dataFeed)); } catch (\Exception $e) { echo $e->getMessage(); } /*try{ $this->producer($message, $topic); return success('日志写入成功'); }catch(Exception $e){ return error('日志写入失败'); } /*while($rk->getOutQlen() > 0) { $rk->poll(50); }*/ //echo json_encode($feedBack); } public function Odm(){ $bulk = Db('bulk'); $data = $this->request->param(); $arg_where = ['stat' => ['in','0,1'], 'country_code' =>$this->country_code]; $search = []; $this->init_search($search); if (isset($data['name']) && $data['name'] != ''){ $arg_where['first_name|last_name|interested|phone'] = ['like', "%$data[name]%"]; $search['name'] = $data['name']; } if ((isset($data['timebegin']) && $data['timebegin'] != '') || (isset($data['timeend']) && $data['timeend'] != '')){ // 时间有一个不为空就初始化 $arg_where['createtime'] = []; if (isset($data['timebegin']) && $data['timebegin'] != '') { $time = $data['timebegin']; array_push($arg_where['createtime'], ['>=', $time]); $search['timebegin'] = $data['timebegin']; } else{ array_push($arg_where['createtime'], ['>=', "0000-00-00"]); } if (isset($data['timeend']) && $data['timeend'] != '') { $time = $data['timeend']; array_push($arg_where['createtime'], ['<=', $time]); $search['timeend'] = $data['timeend']; } else{ $time = date('Y-m-d H:i:s',strtotime('+1 month')); array_push($arg_where['createtime'], ['<=', $time]); } } //$where = ['stat'=>0]; $list = $bulk->where($arg_where)->order('id DESC')->select(); //$count = count($list); //$list = $bulk->where($arg_where)->order('id DESC')->paginate(20,$count); //$page = $list->render(); $feedBack = array(); try { foreach($list as $key =>$item) { $feedBack['website'] = ''; $feedBack['bbs_account'] = ''; $feedBack['industry'] = ''; $feedBack['inquiry_subject'] = ''; $feedBack['agent_country'] = ''; $feedBack['name'] = $item['first_name']." ".$item['last_name']; $feedBack['phone'] = $item['phone']; $feedBack['email'] = $item['email']; $feedBack['company'] = $item['company']; $feedBack['country_timezone_id'] = $item['country']; $feedBack['create_time'] = $item['createtime']; $feedBack['interest_products'] = $item['interested']; $feedBack['inquiry_content'] = $item['message']; $feedBack['inquiry_source_page'] = $item['refer_url']; $feedBack['feedback_refer'] = $item['refer']; $feedBack['feedback_ip'] = $item['ip']; $feedBack['feedback_country'] = $item['state']; $feedBack['feedback_province'] = $item['province']; $feedBack['feedback_city'] = $item['city']; $feedBack['inquiry_device'] = $item['drvice']; $feedBack['feedback_type'] = 2; $dataFeed['lg_task_queue'][] = array( 'database'=>'orico_cms',//数据库 'module'=>'customer',//模块 'worked'=>'add',//事务 'data_feed'=>json_encode($feedBack),//数据 'create_time'=>date("Y-m-d H:i:s"),//执行开始时间 'end_time'=>'',//执行结束时间 'elapsed_time'=>'',//耗时 'stat'=>2,//状态 'create_uid' =>'1',//数据提交人ID //'sql'=>'',//消息为执行SQL语句 'api_url'=>'https://dev.crm.api.f2b211.com/admin/v2_0/inquiry',//消费请求调用接口 'token'=>'efbdb6337a837ecfd36240276b74108c',//消费请求调用接口令牌 'method'=>'POST',//消费请求试GET、POST 'post_data'=>json_encode($feedBack),//消费请求传参 ); } $dataFeed['database'] = 'orico_cms'; $dataFeed['dotype'] = 'queue'; print_r($this->producer($dataFeed)); } catch (\Exception $e) { echo $e->getMessage(); } } //OEM&ODM Service OEM/ODM服务 public function Bulk(){ $BulkInquiry = Db('bulk_inquiry'); $data = $this->request->param(); $arg_where = ['stat' => ['in','0,1'], 'country_code' => $this->country_code]; $search = []; $this->init_search($search); if (isset($data['first_name']) && $data['first_name'] != ''){ $arg_where['first_name|last_name|interest|phone'] = ['like', "%$data[name]%"]; $search['name'] = $data['name']; } if ((isset($data['timebegin']) && $data['timebegin'] != '') || (isset($data['timeend']) && $data['timeend'] != '')){ // 时间有一个不为空就初始化 $arg_where['createtime'] = []; if (isset($data['timebegin']) && $data['timebegin'] != '') { $time = $data['timebegin']; array_push($arg_where['createtime'], ['>=', $time]); $search['timebegin'] = $data['timebegin']; } else{ array_push($arg_where['createtime'], ['>=', "0000-00-00"]); } if (isset($data['timeend']) && $data['timeend'] != '') { $time = $data['timeend']; array_push($arg_where['createtime'], ['<=', $time]); $search['timeend'] = $data['timeend']; } else{ $time = date('Y-m-d H:i:s',strtotime('+1 month')); array_push($arg_where['createtime'], ['<=', $time]); } } // //$where = ['stat'=>0]; $list = $BulkInquiry->where($arg_where)->order('id DESC')->select(); //$count = count($list); //$list = $BulkInquiry->where($arg_where)->order('id DESC')->paginate(20,$count); try { $feedBack = array(); foreach($list as $key =>$item) { $feedBack['website'] = ''; $feedBack['bbs_account'] = ''; $feedBack['country_timezone_id'] = ''; $feedBack['industry'] = ''; $feedBack['inquiry_subject'] = ''; $feedBack['agent_country'] = ''; $feedBack['name'] = $item['first_name']." ".$item['last_name']; $feedBack['phone'] = $item['phone']; $feedBack['email'] = $item['email']; $feedBack['company'] = $item['company']; $feedBack['create_time'] = $item['createtime']; $feedBack['interest_products'] = $item['interest']; $feedBack['inquiry_content'] = $item['message']; $feedBack['inquiry_source_page'] = $item['url']; $feedBack['feedback_refer'] = $item['refer']; $feedBack['feedback_ip'] = $item['ip']; $feedBack['feedback_country'] = $item['state']; $feedBack['feedback_province'] = $item['province']; $feedBack['feedback_city'] = $item['city']; $feedBack['inquiry_device'] = $item['drvice']; $feedBack['feedback_type'] = 2; $dataFeed['lg_task_queue'][] = array( 'database'=>'orico_cms',//数据库 'module'=>'customer',//模块 'worked'=>'add',//事务 'data_feed'=>json_encode($feedBack),//数据 'create_time'=>date("Y-m-d H:i:s"),//执行开始时间 'end_time'=>'',//执行结束时间 'elapsed_time'=>'',//耗时 'stat'=>2,//状态 'create_uid' =>'1',//数据提交人ID //'sql'=>'',//消息为执行SQL语句 'api_url'=>'https://dev.crm.api.f2b211.com/admin/v2_0/inquiry',//消费请求调用接口 'token'=>'efbdb6337a837ecfd36240276b74108c',//消费请求调用接口令牌 'method'=>'POST',//消费请求试GET、POST 'post_data'=>json_encode($feedBack),//消费请求传参 ); } $dataFeed['database'] = 'orico_cms'; $dataFeed['dotype'] = 'queue'; print_r($this->producer($dataFeed)); } catch (\Exception $e) { echo $e->getMessage(); } } //Bulk Buy 批量采购 public function Inquiry(){ $inquiry = Db('inquiry'); $data = $this->request->param(); $arg_where = ['stat' => ['in','0,1'], 'country_code' => $this->country_code]; $search = []; $this->init_search($search); if (isset($data['name']) && $data['name'] != ''){ $arg_where['first_name|last_name|inquiry|phone'] = ['like', "%$data[name]%"]; $search['name'] = $data['name']; } if ((isset($data['timebegin']) && $data['timebegin'] != '') || (isset($data['timeend']) && $data['timeend'] != '')){ // 时间有一个不为空就初始化 $arg_where['createtime'] = []; if (isset($data['timebegin']) && $data['timebegin'] != '') { $time = $data['timebegin']; array_push($arg_where['createtime'], ['>=', $time]); $search['timebegin'] = $data['timebegin']; } else{ array_push($arg_where['createtime'], ['>=', "0000-00-00"]); } if (isset($data['timeend']) && $data['timeend'] != '') { $time = $data['timeend']; array_push($arg_where['createtime'], ['<=', $time]); $search['timeend'] = $data['timeend']; } else{ $time = date('Y-m-d H:i:s',strtotime('+1 month')); array_push($arg_where['createtime'], ['<=', $time]); } } $list = $inquiry->where($arg_where)->order('id DESC')->select(); //$count = count($list); //$list = $inquiry->where($arg_where)->order('id DESC')->paginate(20,$count); try { $feedBack = array(); foreach($list as $key =>$item) { $feedBack['website'] = ''; $feedBack['bbs_account'] = ''; $feedBack['agent_country'] = ''; $feedBack['name'] = $item['first_name']." ".$item['last_name']; $feedBack['phone'] = $item['phone']; $feedBack['email'] = $item['email']; $feedBack['company'] = $item['company']; $feedBack['country_timezone_id'] = $item['country']; $feedBack['create_time'] = $item['createtime']; $feedBack['industry'] = $item['industry']; $feedBack['create_time'] = $item['createtime']; $feedBack['inquiry_subject'] = $item['interested']; $feedBack['interest_products'] = $item['model']; $feedBack['inquiry_content'] = $item['inquiry']; $feedBack['inquiry_source_page'] = $item['murl']; $feedBack['feedback_refer'] = $item['refer']; $feedBack['feedback_ip'] = $item['ip']; $feedBack['feedback_country'] = $item['state']; $feedBack['feedback_province'] = $item['province']; $feedBack['feedback_city'] = $item['city']; $feedBack['inquiry_device'] = $item['drvice']; $feedBack['feedback_type'] = 0; $dataFeed['lg_task_queue'][] = array( 'database'=>'orico_cms',//数据库 'module'=>'customer',//模块 'worked'=>'add',//事务 'data_feed'=>json_encode($feedBack),//数据 'create_time'=>date("Y-m-d H:i:s"),//执行开始时间 'end_time'=>'',//执行结束时间 'elapsed_time'=>'',//耗时 'stat'=>2,//状态 'create_uid' =>'1',//数据提交人ID //'sql'=>'',//消息为执行SQL语句 'api_url'=>'https://dev.crm.api.f2b211.com/admin/v2_0/inquiry',//消费请求调用接口 'token'=>'efbdb6337a837ecfd36240276b74108c',//消费请求调用接口令牌 'method'=>'POST',//消费请求试GET、POST 'post_data'=>json_encode($feedBack),//消费请求传参 ); } $dataFeed['database'] = 'orico_cms'; $dataFeed['dotype'] = 'queue'; print_r($this->producer($dataFeed)); } catch (\Exception $e) { echo $e->getMessage(); } } //Contact 反馈所有类型问题 public function Msgform() { $skeyword = $this->request->get('skeyword', '', 'urldecode'); $arg_where = ['feedback_type' => ['in','Pre-sales Inquiry,OEM&ODM Service,Become a Distributor/Reseller,Bulk Buy,Media Cooperation'],'stat' => ['eq', 0]]; $arg_order = ['id' => 'desc']; $arg_field = ['*']; if (!empty($skeyword)) { $skeyword = trim($skeyword); $arg_where['cname|content'] = ['like', '%' . $skeyword . '%']; $search['skeyword'] = $skeyword; Config::set('paginate.query', ['skeyword' => $skeyword]); //分页参数 } else { $search['skeyword'] = ''; } $dataObject = Loader::model('Msgform')->getPageList($arg_where, $arg_order, $arg_field, 12); $list=$dataObject->isEmpty() ? null : $dataObject->items(); //$dataObject->getCollection()->toArray() $feedBack = array(); $feedType = array( "Pre-sales Inquiry" => 0, "Become a Distributor/Reseller" => 1, "OEM&ODM Service" => 2, "Bulk Buy" => 3, "Media Cooperation" => 4, "After Sales Support" =>5 , ); try { foreach($list as $key =>$item) { $feedBack['name'] = $item['name']; $feedBack['website'] = ''; $feedBack['bbs_account'] = ''; $feedBack['interest_products'] = ''; $feedBack['agent_country'] = ''; $feedBack['phone'] = ''; $feedBack['email'] = $item['contact']; $feedBack['company'] = ''; $feedBack['country_timezone_id'] = $item['country']; $feedBack['industry'] = ''; $feedBack['create_time'] = $item['createtime']; $feedBack['inquiry_subject'] = $item['subject']; $feedBack['inquiry_content'] = $item['content']; $feedBack['inquiry_source_page'] = $item['url']; $feedBack['feedback_refer'] = $item['refer']; $feedBack['feedback_ip'] = $item['ip']; $feedBack['feedback_country'] = $item['state']; $feedBack['feedback_province'] = $item['province']; $feedBack['feedback_city'] = $item['city']; $feedBack['inquiry_device'] = $item['drvice']; $feedBack['feedback_type'] = isset($feedType[$item['feedback_type']]) ? trim($feedType[$item['feedback_type']]): ''; $dataFeed['lg_task_queue'][] = array( 'database'=>'orico_cms',//数据库 'module'=>'customer',//模块 'worked'=>'add',//事务 'data_feed'=>json_encode($feedBack),//数据 'create_time'=>date("Y-m-d H:i:s"),//执行开始时间 'end_time'=>'',//执行结束时间 'elapsed_time'=>'',//耗时 'stat'=>2,//状态 'create_uid' =>'1',//数据提交人ID //'sql'=>'',//消息为执行SQL语句 'api_url'=>'https://dev.crm.api.f2b211.com/admin/v2_0/inquiry',//消费请求调用接口 'token'=>'efbdb6337a837ecfd36240276b74108c',//消费请求调用接口令牌 'method'=>'POST',//消费请求试GET、POST 'post_data'=>json_encode($feedBack),//消费请求传参 ); } $dataFeed['database'] = 'orico_cms'; $dataFeed['dotype'] = 'queue'; print_r($this->producer($dataFeed)); } catch (\Exception $e) { echo $e->getMessage(); } } }