当前位置: 首页 > news >正文

外部访问K8S集群内部的kafka集群服务

不许转载

kafka 部署
把 kafka 部署到 k8s 后,我们肯定是通过 service 从 k8s 外部访问 kafaka。这里的 service 要么是 NodePort, 要么是 LoadBalancer 类型。我们使用的方式是 LoadBalancer。
我们先看下面这张图,这是 kafka 在集群中的网络拓扑。当我们通过地址 12.345.67:31090 访问到 kafka 后,kafka 返回的访问地址是类似这样的 endpoint jettopro-kafka.jettopro-poc.svc.cluster.local:9092。这是 k8s 集群内部能访问的 headless service endpoint 地址,从集群外部自然使用这个地址是访问不通的。

所以,我们需要解决两个问题:

  1. kafka 不同的 pod 需要不通的对外能访问的地址
  2. 不能使用 kafka 默认的 advertised.listeners
解决方案

问题1,我们为每个 pod 创建类型是 LoadBalancer 的 service 并且监听不同的端口。这样通过 LB IP + port 就能找到特定的 kafka broker。
service 示例如下:

apiVersion: v1
kind: Service
metadata:name: kafka-{index}
spec:externalTrafficPolicy: Localtype: LoadBalancerselector:statefulset.kubernetes.io/pod-name: kafka-{index}ports:- protocol: TCPport: {9092+index}targetPort: 9092

这里如果不是云主机,也可以使用NodePort类型来暴露kafka服务。

问题2,我们在容器启动的时候,执行脚本动态获取 pod 编号,生成容器需要的环境变量 KAFKA_CFG_ADVERTISED_LISTENERS(对应 kafka broker 的配置 advertised.listener)
 

HOSTNAME=`hostname -s`
if [[ $HOSTNAME =~ (.*)-([0-9]+)$ ]]; thenORD=${BASH_REMATCH[2]}PORT=$((ORD + 9092))#12.345.67.8 是 LB 的 ipexport KAFKA_CFG_ADVERTISED_LISTENERS="PLAINTEXT://12.345.67.8:$PORT"
elseecho "Failed to get index from hostname $HOST"exit 1
fi
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafka
spec:selector:matchLabels:app: kafkaserviceName: kafkareplicas: 3updateStrategy:type: RollingUpdatepodManagementPolicy: OrderedReadytemplate:metadata:labels:app: kafkaspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- kafkatopologyKey: "kubernetes.io/hostname"containers:- name: kafkacommand:- bash- -ec- |HOSTNAME=`hostname -s`if [[ $HOSTNAME =~ (.*)-([0-9]+)$ ]]; thenORD=${BASH_REMATCH[2]}PORT=$((ORD + 9092))export KAFKA_CFG_ADVERTISED_LISTENERS="PLAINTEXT://12.345.67.8:$PORT"elseecho "Failed to get index from hostname $HOST"exit 1fiexec /entrypoint.sh /run.shimagePullPolicy: Alwaysimage: "bitnami/kafka:2"env:- name: ALLOW_PLAINTEXT_LISTENERvalue: "yes"- name: KAFKA_CFG_ZOOKEEPER_CONNECTvalue: "zookeeper-0.zookeeper-hs:2181,zookeeper-1.zookeeper-hs:2181,zookeeper-2.zookeeper-hs:2181"- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTORvalue: "3"- name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISRvalue: "3"- name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTORvalue: "3"ports:- containerPort: 9092volumeMounts:- name: kafka-datamountPath: /bitnamisecurityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: kafka-dataspec:accessModes: [ "ReadWriteOnce" ]storageClassName: alicloud-disk-efficiencyresources:requests:storage: 20Gi

http://www.lryc.cn/news/224874.html

相关文章:

  • AttributeError: module ‘tensorflow‘ has no attribute ‘contrib‘解决办法
  • 物奇平台耳机恢复出厂设置功能实现
  • RFID携手制造业升级,为锂电池生产带来前所未有的可靠性
  • 【星海出品】flask (四) 三方工具使用
  • MongoDB 索引
  • [Hive] INSERT OVERWRITE DIRECTORY要注意的问题
  • 刚柔相济铸伟业 ——访湖南顺新金属制品科技有限公司董事长张顺新
  • DHorse(K8S的CICD平台)的实现原理
  • 类图复习:类图简单介绍
  • 【字符串】【双指针翻转字符串+快慢指针】Leetcode 151 反转字符串中单词【好】
  • 3D Gaussian Splatting:用于实时的辐射场渲染
  • 【nlp】文本处理的基本方法
  • C++17 std::filesystem
  • JVM在线分析-解决问题的工具一(jinfo,jmap,jstack)
  • [深度学习]不平衡样本的loss
  • 【MySQL】表的增删改查(强化)
  • MyBatis-Plus--在xml中使用wrapper的方法
  • Oracle RAC是啥?
  • springboot中定时任务cron不生效,fixedRate指定间隔失效,只执行一次的问题
  • 苹果手机发热发烫是什么原因?看完这篇你就知道了!
  • 民安智库(第三方满意度调研公司):助力健身房提升客户满意度的秘密武器
  • 2011年09月01日 Go生态洞察:Go语言词法扫描与App Engine演示
  • pytorch搭建squeezenet网络的整套工程(升级版)
  • 222. 完全二叉树的节点个数
  • adb and 软件架构笔记
  • 算术运算符、自增自减运算符、赋值运算符、关系运算符、逻辑运算符、三元运算符
  • k8s 配置资源管理
  • expo + react native项目隐藏状态栏踩坑
  • 若依:用sqlite3随便掰饬掰饬
  • 刚安装的MySQL使用Navicat操作数据库遇到的问题