当前位置: 首页 > news >正文

PHP小白搭建Kafka环境以及初步使用rdkafka

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 一、安装java(Kafka必须安装java,因为kafka依赖java核心)
  • 二、安装以及配置Kafka、zookeeper
    • 1.下载Kafka(无需下载zookeeper,使用kafka自带的即可)
    • 2.配置topid
    • 3.安装PHP的rdkafka,这个网上教程很多,基本上都是正确的


前言

提示:windows环境安装失败,Linux环境安装成功(以下并没有windows安装示例)

一、安装java(Kafka必须安装java,因为kafka依赖java核心)

下载地址:链接: https://www.oracle.com/java/technologies/downloads/#jdk20-linux
在这里插入图片描述
将文件放在Linux目录中后进行解压:

假设我把[jdk-20_linux-x64_bin.tar.gz]包放在了/root/src/uap/web/third 目录下
1、tar -zxvf jdk-20_linux-x64_bin.tar.gz
2、mv jdk.0.20 ./jdk
3、vim /etc/profile JAVA_HOME=/root/src/uap/web/third/jdkPATH=/root/src/uap/web/third/jdk/bin:$PATHexport JAVA_HOME
4、source /ect/profile
5、java -version (出现下图极为成功)

在这里插入图片描述

二、安装以及配置Kafka、zookeeper

1.下载Kafka(无需下载zookeeper,使用kafka自带的即可)

下载地址:https://kafka.apache.org/downloads
提示:不要下载带src的那个,具体我也不知道,因为我也是个小白
在这里插入图片描述

假设我把[kafka_2.12-3.5.1.tgz]包放在了/root/src/uap/web/third 目录下
1、tar -zxvf kafka_2.12-3.5.1.tgz
2、mv kafka.2.12 ./kafka
3、创建kafka日志文件mkdir -p ./kafka_data/log/kafkamkdir -p ./kafka_data/log/zookeepermkdir -p ./kafka_data/zookeeper
4、cd ./kafka/config
vim server.propertieslisteners=PLAINTEXT://localhost:9092 (34行左右,添加对应的host、port)broker.id=0port=9092host.name=192.168.1.241log.dirs=/root/src/uap/web/third/kafka_data/log/kafkazookeeper.connect=localhost:2181
wd
vim zookeeper.propertiesdataDir=/root/src/uap/web/third/kafka_data/zookeeperdataLogDir=/root/src/uap/web/third/kafka_data/log/zookeeperclientPort=2181maxClientCnxns=100tickTimes=2000initLimit=10syncLimit=5
wd
5、cd ../ 进入kafka目录下
#启动zookeeper
./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
//如果其中报错,大部分应该是报JAVA_HOME 这个说明你没有配置 /etc/profile 上面有
./bin/kafka-server-start.sh -daemon ./config/server.properties &

2.配置topid

代码如下(示例):

kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic myt
返回值:Created topic myt.  创建成功/否则失败

3.安装PHP的rdkafka,这个网上教程很多,基本上都是正确的

例如:阿里云开发者社区,php安装rdkafka教程
剩下逻辑就直接贴代码了

生产者:
public function producer(){$conf = new RdKafka\Conf();$conf->set('metadata.broker.list', 'localhost:9092');$producer = new RdKafka\Producer($conf);$topic = $producer->newTopic("mytest");//获取数据库数据,存入kafka中$wanchk = $this->db->query("SELECT * FROM hf_alarm_wanchk");foreach ($wanchk as $k => $v){$topic->produce(RD_KAFKA_PARTITION_UA, 0, array2json($v));$producer->poll(0);}$result = $producer->flush(10000);if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {throw new \RuntimeException('Was unable to flush, messages might be lost!');}$producer->purge(RD_KAFKA_PURGE_F_QUEUE);$producer->flush(10000);}
消费者:
//这个代码需要使用终端运行:
// /bin/php -c /etc/php.ini  -f  /入口文件目录/index.php (类)consumer (方法)consumerpublic function consumer(){$conf = new \RdKafka\Conf();$conf->set('group.id', 'mytest');$rk = new \RdKafka\Consumer($conf);$rk->addBrokers("127.0.0.1");$topicConf = new \RdKafka\TopicConf();$topicConf->set('auto.commit.interval.ms', 100);$topicConf->set('offset.store.method', 'broker');$topicConf->set('auto.offset.reset', 'smallest');$topic = $rk->newTopic('mytest', $topicConf);$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);while (true) {$message = $topic->consume(0, 120 * 10000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:var_dump($message);break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:echo "No more messages; will wait for more\n";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:echo "Timed out\n";break;default:throw new \Exception($message->errstr(), $message->err);break;}}} 
http://www.lryc.cn/news/149093.html

相关文章:

  • 【Java Web】敏感词过滤
  • stable diffusion实践操作-提示词
  • leetcode8.字符串转整数-Java
  • 从零开始的Hadoop学习(四)| SSH无密登录配置、集群配置
  • 微信小程序活动报名管理系统设计与实现
  • 用Kubernetes(k8s)的ingress部署https应用
  • 【附安装包】MyEclipse2020安装教程
  • 软件与软件工程
  • 记录一下:基于nginx配置的封禁真实IP
  • 【狂神】Spring5笔记(1-9)
  • Redis——急速安装并设置自启(CentOS)
  • C++中使用while循环
  • 视频融合平台EasyCVR视频汇聚平台关于小区高空坠物安全实施应用方案设计
  • IBM安全发布《2023年数据泄露成本报告》,数据泄露成本创新高
  • python爬虫—requests
  • 应用案例 | 3D视觉引导解决方案汽车零部件上下料
  • const {}解构赋值
  • 一篇文章带你了解-selenium工作原理详解
  • H5 + C3基础(八)(3d转换 位移 旋转)
  • PyQt6 GUI界面设计和Nuitka包生成exe程序(全笔记)
  • 学习网络编程No.5【TCP套接字通信】
  • 常用的时间段的时间戳
  • 博客系统后台控制层接口编写
  • 生成 MySQL 删除索引、创建索引、分析表的 SQL 语句
  • mongodb建用户
  • 无门槛访问ChatGPT升级版-数据指北AI
  • 前端需要学习哪些技术?
  • 详解排序算法(附带Java/Python/Js源码)
  • 手写Mybatis:第8章-把反射用到出神入化
  • 基于AI智能分析网关EasyCVR视频汇聚平台关于能源行业一体化监控平台可实施应用方案