当前位置: 首页 > news >正文

docker-compose安装kafka和php简单测试

docker-compose.yml内容:

 version: '3.1'
services:
  zookeeper:
    container_name: zookeeper
    image: zookeeper:3.6
    ports:
      - 2181:2181
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    volumes:
      - /docker/kafka:/kafka
    environment:
      ALLOW_PLAINTEXT_LISTENER: 'yes'
      KAFKA_BROKER_NO: 0
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.153.128:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  kafka-manager:
    container_name: kafka-manager
    image: kafkamanager/kafka-manager:latest
    ports:
      - 9000:9000
    environment:
      ZK_HOSTS: zookeeper:2181
      KAFKA_MANAGER_AUTH_ENABLED: "true"
      KAFKA_MANAGER_USERNAME: admin
      KAFKA_MANAGER_PASSWORD: admin
    depends_on:
      - kafka
      - zookeeper

web管理页面:

http://192.168.153.128:9000

php生产者:

<?php
namespace app\index\controller;

class KafkaProducer
{
    public function index()
    {
        $rk = new \RdKafka\Producer();
        $rk->addBrokers("192.168.153.128"); //kafka服务器地址
        $topic = $rk->newTopic("topic1"); //topic名称
        for ($i = 0; $i < 5; $i++) {
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, "发送信息成功: $i");
            $rk->poll(0);
        }
        while ($rk->getOutQLen() > 0) {
            $rk->poll(50);
        }
    }
}

生产者路径:http://tp6api.local/index/kafkaproducer

php消费者:

<?php
$conf = new \RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup');

$rk = new \RdKafka\Consumer($conf);
$rk->addBrokers("192.168.153.128:9092");
$topicConf = new \RdKafka\TopicConf();

$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', sys_get_temp_dir());
$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic("topic1", $topicConf);

// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
    $message = $topic->consume(0, 120*10000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            //没有错误打印信息
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "等待接收信息\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "超时\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

消费者路径:

 

http://www.lryc.cn/news/23698.html

相关文章:

  • 【蓝桥云课】快速幂
  • 解决windows安装wxPython安装失败、速度过慢及PyCharm上wx包爆红问题
  • 封装小程序request请求[接口函数]
  • 嵌入式 STM32 通讯协议--MODBUS
  • 互联网人看一看,这些神器你用过哪些?
  • Kotlin学习:5.2、异步数据流 Flow
  • EPICS synApps介绍
  • Pycharm和跳板机 连接内网服务器
  • mysql去重查询的三种方法
  • PHP反序列化
  • 什么蓝牙耳机打电话效果最好?通话效果好的无线蓝牙耳机
  • Tesseract centos环境安装,基于springboot图片提取文字
  • Elasticsearch7.8.0版本优化——写入速度优化
  • 【Redis】Redis主从同步中数据同步原理
  • Python基础—while循环
  • linux基础(管道符,检索,vim和vi编辑使用)
  • GAN | 代码简单实现生成对抗网络(GAN)(PyTorch)
  • 华为面试题就这?00后卷王直接拿下30k华为offer......
  • html的常见标签使用
  • STM32——毕设智能感应窗户
  • golang archive/tar库的学习
  • MongoDB 详细教程,这一篇就够啦
  • python为什么慢
  • Android kotlin 组件间通讯 - LiveEventBus 及测试(更新中)
  • linux服务器时间同步
  • 扒系统CR8记录
  • 面试题(基础篇)
  • 如何利用ReconPal将自然语言处理技术应用于信息安全
  • 攻略 | 6步帮助中小微企业开拓东盟机电产品市场
  • Linux服务器磁盘分区、挂载、卸载及报错处理