当前位置: 首页 > news >正文

卡夫卡的理解

一、架构理解

在这个单聊新架构中,涉及多个服务器组件共同协作来实现单聊功能。

  • ChatAccessServer:可能负责处理单聊相关的访问请求,比如用户登录单聊以及发送单消息的请求接入。
  • ChatHttpPushServer:推测其用于通过 HTTP 协议推送单聊消息等相关信息。
  • chatConsumerServer:可能用于消费处理特定的消息或数据,如处理单聊历史纪录和登录信息等。

消息流转过程大致为:单聊消息和登录相关操作产生的数据进入 Kafka,然后由 chatConsumerServer 进行消费处理,可能将单聊历史纪录存储到 Pika/TairDb 等存储系统中,同时转发单聊消息等操作也基于这个数据流转过程进行。

二、Kafka 的作用

  1. 解耦:Kafka 作为一个中间件,将消息的生产者(如发送单聊消息的客户端、处理登录的模块等)和消费者(如 chatConsumerServer)解耦。这样各个组件可以独立开发、部署和扩展,而不相互影响。例如,当增加新的单聊消息处理模块时,不需要对消息生产者进行修改。
  2. 缓冲:单聊消息和登录结果等数据的产生和处理可能在不同的时间点和速度上进行。Kafka 可以作为一个缓冲区,存储这些数据,以便消费者在合适的时候进行处理。这有助于避免因生产速度快于消费速度而导致的数据丢失或系统压力过大。
  3. 可靠性:Kafka 可以确保数据的可靠传递。即使某个消费者出现故障,消息也不会丢失,而是可以在消费者恢复后继续进行处理。
  4. 可扩展性:随着单聊系统的用户量增加和业务需求的变化,系统需要具备良好的可扩展性。Kafka 可以方便地扩展以处理更多的消息流量,通过增加 broker 节点或分区等方式来提高系统的吞吐量。

三、没有 Kafka 会怎样

  1. 耦合度增加:如果没有 Kafka,消息生产者和消费者之间的耦合度会增加。这意味着任何对消息处理流程的修改都可能需要同时修改多个组件,导致系统的维护和扩展变得困难。
  2. 缺乏缓冲:没有缓冲机制,可能会导致在消息生产速度较快时,消费者无法及时处理所有消息,从而造成数据丢失或系统性能下降。
  3. 可靠性降低:没有可靠的消息传递中间件,消息的可靠性将依赖于各个组件的实现。如果某个组件出现故障,可能会导致消息丢失,影响单聊系统的正常运行。
  4. 可扩展性受限:在没有 Kafka 的情况下,扩展系统以处理更多的消息流量可能会变得更加复杂。可能需要对各个组件进行逐个优化和扩展,而无法像使用 Kafka 那样通过简单地增加节点来提高系统的吞吐量。

简单举例:

#include <iostream>
#include <string>
#include <cstdlib>
#include <ctime>
#include <kafka/Producer.h>
#include <kafka/Consumer.h>// 模拟单聊消息结构体
struct ChatMessage {std::string sender;std::string receiver;std::string content;
};// 消息生产者
void produceMessage(const std::string& topic) {using namespace kafka;Properties props{{"bootstrap.servers", "localhost:9092"}};Producer producer(props);srand(static_cast<unsigned int>(time(nullptr)));std::string senders[] = {"user1", "user2"};std::string receivers[] = {"user3", "user4"};std::string contents[] = {"Hello", "How are you?", "Nice to talk to you."};ChatMessage message;message.sender = senders[rand() % 2];message.receiver = receivers[rand() % 2];message.content = contents[rand() % 3];// 将消息发送到 KafkaProducerRecord record(topic, NullKey, Value(message.sender + "|" + message.receiver + "|" + message.content));producer.produce(record);
}// 已有消息消费者
void consumeMessage(const std::string& topic) {using namespace kafka;Properties props{{"bootstrap.servers", "localhost:9092"},{"group.id", "consumer_group_1"}};Consumer consumer(props);consumer.subscribe({topic});while (true) {ConsumerRecords records = consumer.poll(std::chrono::milliseconds(100));for (const auto& record : records) {std::string messageStr(record.value().toString());std::cout << "Existing consumer: Received message: " << messageStr << std::endl;}}
}// 新增加的消息消费者
void newConsumeMessage(const std::string& topic) {using namespace kafka;Properties props{{"bootstrap.servers", "localhost:9092"},{"group.id", "consumer_group_2"}};Consumer consumer(props);consumer.subscribe({topic});while (true) {ConsumerRecords records = consumer.poll(std::chrono::milliseconds(100));for (const auto& record : records) {std::string messageStr(record.value().toString());std::cout << "New consumer: Received message: " << messageStr << std::endl;}}
}
http://www.lryc.cn/news/452080.html

相关文章:

  • 基础算法之滑动窗口--Java实现(上)--LeetCode题解:长度最小的子数组-无重复字符的子串-最大连续1的个数III-将x减到0的最小操作数
  • Linux -- 文件系统(文件在磁盘中的存储)
  • 微服务(Microservices),服务网格(Service Mesh)以及无服务器运算Serverless简单介绍
  • 【AIGC】AI时代的数据安全:使用ChatGPT时的自查要点
  • 什么是区块链桥?
  • 机器学习框架
  • 金三银四:20道前端手写面试题
  • RAC被修改权限及相关问题
  • Golang | Leetcode Golang题解之第441题排列硬币
  • 数学建模--什么是数学建模?数学建模应该怎么准备?
  • Java项目实战II基于Java+Spring Boot+MySQL的智能物流管理系统(源码+数据库+文档)
  • 【数据分享】2000—2023年我国省市县三级逐月植被覆盖度(FVC)数值(Shp/Excel格式)
  • 《Linux从小白到高手》理论篇(十一):Linux的系统环境管理
  • Qt/C++开源控件 自定义雷达控件
  • 什么是IDE(集成开发环境)?
  • 【Linux】用虚拟机配置Ubuntu 24.04.1 LTS环境
  • MacOS升级Ruby版本详解:步骤、挑战与解决方案
  • Log4j的配置与使用详解
  • docker 的目录有那些,分别存放什么东西
  • 开源模型应用落地-模型微调-语料采集-数据格式化(四)
  • C语言+单片机
  • vmvare虚拟机centos 忘记超级管理员密码怎么办?
  • 使用 Vue3 和 Axios 实现 CRUD 操作
  • .NET MAUI(.NET Multi-platform App UI)下拉选框控件
  • C++平台跳跃游戏
  • 多系统萎缩患者必看!这些维生素助你对抗病魔
  • 深度学习模型性能优化实战之从评估到提升的全流程解析
  • C++ | Leetcode C++题解之第446题等差数列划分II-子序列
  • 【解密 Kotlin 扩展函数】扩展属性与扩展函数类似(十九)
  • 【Spring Boot 入门二】Spring Boot中的配置文件 - 掌控你的应用设置