当前位置: 首页 > news >正文

php实现kafka

kafka类:

<?phpclass b2c_kafka
{public $broker_list;public $topic;public $group_id;protected $producer = null;protected $consumer = null;protected $receive_wait_time;protected $receive_wait_num;/*** 构造方法* @param object app*/public function __construct(){$this->broker_list = 'kafka';$this->topic = 'local-cn';$this->group_id = ' kafka-map';$this->producer = null;$this->consumer = null;$this->receive_wait_time = 10;$this->receive_wait_num = 100;}/*** 获取生产者*/public function Producer(){$conf = new \RdKafka\Conf();// $conf->set('bootstrap.servers', $this->broker_list);$conf->set('metadata.broker.list', $this->broker_list);// 0:不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成// 1:leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功// all:leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成$conf->set('acks', '0');//If you need to produce exactly once and want to keep the original produce order, uncomment the line below//$conf->set('enable.idempotence', 'true');$producer = new \RdKafka\Producer($conf);$this->producer = $producer;return $this;}/*** 发送消息** @param string|array $msg* @param string $topic* @return void*/public function SendMsg($msg = '', $topic = ''){if (empty($topic)) {$topic = $this->topic;}$producer = $this->producer;$topic = $producer->newTopic($topic);if (!is_array($msg)) {$msg = [$msg];}foreach ($msg as $value) {$topic->produce(RD_KAFKA_PARTITION_UA, 0, $value);$producer->poll(0);}for ($flushRetries = 0; $flushRetries < count($msg); $flushRetries++) {$result = $producer->flush(10000);            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {break;}}if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {throw new \RuntimeException('Kafka消息发送失败,错误代码:' . $result);}}/*** 获取消费者** @param string $group_id* @return void*/public function Consumer($group_id = ''){$conf = new \RdKafka\Conf();// Set a rebalance callback to log partition assignments (optional)$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {switch ($err) {case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:echo "Assign: ";var_dump($partitions);$kafka->assign($partitions);break;case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:echo "Revoke: ";var_dump($partitions);$kafka->assign(NULL);break;default:throw new \Exception($err);}});// Configure the group.id. All consumer with the same group.id will consume// different partitions.if (empty($group_id)) {$group_id = $this->group_id;}// 设置相同的group,防止一次消息被多次消费。// 消费者启动的进程数应小于等于topic的分区数,否则多余的进程是无用的$conf->set('group.id', $group_id);// Initial list of Kafka brokers// $conf->set('bootstrap.servers', $this->broker_list);$conf->set('metadata.broker.list', $this->broker_list);// Set where to start consuming messages when there is no initial offset in// offset store or the desired offset is out of range.// 'smallest': start from the beginning//earliest:简单理解为从头开始消费,latest:简单理解为从最新的开始消费$conf->set('auto.offset.reset', 'earliest');// 在interval.ms的时间内定期向ZooKeeper提交使用者已获取的消息的偏移量// 自动提交分区消费的位置,手动可确保消息被消费$conf->set('enable.auto.commit', true);$conf->set('auto.commit.interval.ms', 1000);$consumer = new \RdKafka\KafkaConsumer($conf);$this->consumer = $consumer;return $this;}/*** 接收消息** @param string $topic* @param array $callback* @return void*/public function ReceiveMsg($topic = '', array $callback = []){$consumer = $this->consumer;if (empty($topic)) {$topic = $this->topic;}if (!is_array($topic)) {$topic = [$topic];}// Subscribe to topic 'test'$consumer->subscribe($topic);echo "Waiting for partition assignment... (make take some time when\n";echo "quickly re-joining the group after leaving it.)\n";$i = 0;$msg_list = [];while (true) {$i++;if ($i > $this->receive_wait_time) {$i = 0;if (empty($msg_list)) {continue;}if (!empty($callback)) {call_user_func_array($callback, [$msg_list]);}$msg_list = [];}// 阻塞一秒钟$message = $consumer->consume(1000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:$msg_list[] = $message->payload;if (count($msg_list) < $this->receive_wait_num) {continue;}if (!empty($callback)) {call_user_func_array($callback, [$msg_list]);}$i = 0;$msg_list = [];break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:// echo "No more messages; will wait for more\n";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:// echo "Timed out\n";break;default:throw new \Exception($message->errstr(), $message->err);break;}}}}

调用:

$data['member_id']     = '121221';
$data['ip']            = $this->getIp();
$data['timestamp']     = $this->getMicrotime();$kafkaObj = new kafka();
$kafkaObj->Producer()->sendMsg(json_encode($data, 320));
http://www.lryc.cn/news/439208.html

相关文章:

  • YOLOv10改进系列,YOLOv10损失函数更换为Powerful-IoU(2024年最新IOU),助力高效涨点
  • 工具知识 | Linux 常用命令参考手册
  • mysql 常用知识点总结
  • conda常用指令
  • 前后端分离项目--下载功能
  • PMP--一模--解题--81-90
  • 计算机网络 --- 【2】计算机网络的组成、功能
  • 『功能项目』切换职业技能面板【49】
  • 寻找排名好的自闭症学校?这些关键因素不可忽视
  • Git常用命令(记录)
  • STM32+ESP8266 WiFi连接机智云平台APP远程控制教程
  • 学懂C++(六十):C++ 11、C++ 14、C++ 17、C++ 20新特性大总结(万字详解大全)
  • 杭电1008电梯
  • 【Python小知识 - 2】:在VSCode中切换Python解释器版本
  • ubuntu meson安装
  • 记者协会评审系统-需求分析
  • python 检索与该查询最相似的句子 使用库hflayers和sentence_transformers来实现遇到的问题
  • 计算机毕业设计 在线新闻聚合平台的设计与实现 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试
  • 【机器学习随笔】概率论与实际问题的对应
  • C++初阶学习——探索STL奥秘——模拟实现list类
  • 生命之光不灭:帕金森综合征晚期,如何携手共度温暖岁月
  • Matlab simulink建模与仿真 第十五章(信号源库)
  • Java笔记 2 java概述和基础知识
  • 在使用ST-Link下载程序时出现“Error: Flash Download failed - Cortex-”
  • 长沙自闭症青少年学校:实现孩子的全面成长
  • 系统 IO
  • Mysql InnoDB 存储引擎简介
  • 驾校预约学习系统的设计与实现
  • Python--读取文件时出现的报错
  • 基于http请求的一种安全校验认证方案记录